apachekafka® Archives - credativ®

… mit Kafka® Connect und dem Debezium PostgreSQL® Quellconnector

Moderne, verteilte ereignis- und streamingbasierte Systeme machen sich die Idee zu eigen, dass Änderungen unvermeidlich und sogar wünschenswert sind! Ohne Änderungsbewusstsein sind Systeme unflexibel, können sich nicht weiterentwickeln oder reagieren und sind schlichtweg nicht in der Lage, mit Echtzeitdaten aus der realen Welt Schritt zu halten. In einer früheren 2-teiligen Blogserie (Teil 1, Teil 2) haben wir herausgefunden, wie man mit dem Debezium Cassandra Connector Änderungsdaten aus einer Apache Cassandra®-Datenbank erfasst und Echtzeit-Ereignis-Streams in Apache Kafka® erzeugt.

Aber wie kann man einen „Elefanten“ (PostgreSQL®) auf das Tempo eines „Geparden“ (Kafka) bringen?

Geparden sind die schnellsten Landtiere (Spitzengeschwindigkeit 120 km/h, Beschleunigung von 0 auf 100 km/h in 3 Sekunden) – 3-mal schneller als Elefanten (40 km/h). (Quelle: Shutterstock)

1. Der Debezium PostgreSQL Connector

Ähnlich wie der Debezium Cassandra Connector (Blog Teil 1, Teil 2) erfasst auch der Debezium PostgreSQL Connector Datenbankänderungen auf Zeilenebene und überträgt den Stream über Kafka Connect an Kafka. Ein wesentlicher Unterschied besteht jedoch darin, dass dieser Connector als Kafka-Quellconnector ausgeführt wird. Wie lässt sich also vermeiden, dass auf dem PostgreSQL-Server benutzerdefinierter Code ausgeführt werden muss? Aus der Dokumentation geht Folgendes hervor:

„Ab PostgreSQL 10 gibt es einen logischen Replikations-Stream-Modus, genannt pgoutput, der nativ von PostgreSQL unterstützt wird. Das bedeutet, dass ein Debezium PostgreSQL Connector diesen Replikations-Stream nutzen kann, ohne dass zusätzliche Plug-ins erforderlich sind.“

Somit kann der Connector einfach als PostgreSQL Streaming Replication Client ausgeführt werden. Um den Connector auszuführen, müssen Sie ihn herunterladen, in Ihrer Kafka Connect-Umgebung installieren, konfigurieren, PostgreSQL einrichten und dann wie folgt ausführen.

1.1. Debezium PostgreSQL Connector herunterladen

Der Connector kann hier heruntergeladen werden.

1.2. Debezium PostgreSQL Connector installieren

Ich werde hier den Dienst Instaclustr Managed Kafka Connect verwenden. Mit diesem Dienst können benutzerdefinierte Connectors verwendet werden, allerdings müssen sie zunächst in einen AWS S3 Bucket geladen und dann über die Instaclustr-Verwaltungskonsole synchronisiert werden. (Ich habe in meinem S3 Bucket einen Ordner mit dem Namen debezium-connector-postgres erstellt und alle Jars aus dem ursprünglichen Download in diesen Ordner hochgeladen.)
Wenn alles funktioniert hat, sehen Sie in der Liste der verfügbaren Connectors auf der Konsole einen neuen Connector mit dem Namen io.debezium.connector.postgresql.PostgresConnector.

1.3. PostgreSQL konfigurieren

Hier sind die erforderlichen PostgreSQL-Servereinstellungen:

  1. Prüfen Sie wal_level. Wenn dies nicht auf logical steht, setzen Sie es auf logical. (Dazu ist ein Server-Neustart und bei einem verwalteten Dienst ggf. Unterstützung erforderlich.)
  2. Für PostgreSQL > 10+ sind keine zusätzlichen Plug-ins erforderlich, da pgoutput verwendet wird (Sie müssen jedoch das Standard-Plug-in plugin.name in der Konfiguration des Connectors überschreiben, siehe unten).
  3. Benutzerberechtigungen konfigurieren
    a. Laut Anweisungen soll ein Debezium-Benutzer erstellt werden, der über die erforderlichen Mindestrechte verfügt (REPLICATION- und LOGIN-Rechte),
    b. und um pgoutput zu verwenden, benötigen Sie weitere Berechtigungen.

Beachten Sie, dass für diese Einstellungen Administratorrechte für PostgreSQL nötig sind. Wenn Sie also einen verwalteten Dienst verwenden, müssen Sie möglicherweise die Hilfe Ihres Dienstanbieters in Anspruch nehmen, um die notwendigen Änderungen vorzunehmen.

1.4. Debezium PostgreSQL Connector konfigurieren und ausführen

Damit Sie den Connector ausführen können, finden Sie hier ein Beispiel für eine Connector-Konfiguration. Beachten Sie, dass der Standardwert von plugin.name nicht pgoutput ist, weshalb Sie ihn explizit angeben müssen (geben Sie die IP-Adresse, den Benutzernamen und das Passwort für den Kafka Connect-Cluster und die IP-Adresse, den Benutzernamen und das Passwort für die PostgreSQL-Datenbank an):

curl https://KafkaConnectIP:8083/connectors -X POST -H 'Content-Type: application/json' -k -u kc_username:kc_password -d '{
  "name": "debezium-test1",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "PG_IP",
    "database.port": "5432",
    "database.user": "pg_username",
    "database.password": "pg_password",
    "database.dbname" : "postgres",
    "database.server.name": "test1",
    "plugin.name": "pgoutput"
  }
}

Wenn das korrekt funktioniert hat, sehen Sie in der Instaclustr Kafka Connect-Konsole einen einzelnen laufenden Task für debezium-test1. Beachten Sie, dass der Standardwert und auch der einzige zulässige Wert für tasks.max genau 1 ist, sodass Sie ihn nicht explizit festlegen müssen.

1.5. Tabellen-Themen-Zuordnungen mit Debezium PostgreSQL Connector

Vielleicht fällt Ihnen auf, dass in der Konfiguration keine Tabellennamen oder Themen angegeben sind. Das liegt daran, dass der Connector standardmäßig Änderungen für alle Nicht-System-Tabellen erfasst und Ereignisse für eine einzelne Tabelle in ein einzelnes Kafka-Thema schreibt.
Standardmäßig lautet der Name des Kafka-Themas serverName.schemaName.tableName, wobei:
serverName der logische Name des Connectors wie im Konfigurationsmerkmal des Connectors database.server.name angegeben ist (und eindeutig sein muss)
schemaName der Name des Datenbankschemas ist
tableName der Name der Datenbanktabelle ist
Es gibt eine Reihe von Konfigurationsoptionen zum Ein- oder Ausschließen von Schemata, Tabellen und Spalten. (Verwenden Sie nur eine für jedes Objekt.)
Ich konnte keine PostgreSQL Connector-spezifischen Konfigurationsoptionen finden, um die standardmäßige Tabellen-Thema-Zuordnung zu ändern; das liegt jedoch daran, dass Sie generische Debezium Single Message Transforms, SMTs für benutzerdefiniertes Topic Routing, verwenden müssen.

2. Daten-Änderungsereignisse von Debezium PostgreSQL Connector kennenlernen

Ein furchterregender „Giraffosaurus“! (Oder eine T-Raffe?) (Quelle: Shutterstock)

Wenn alles richtig funktioniert, sehen Sie einige Daten-Änderungsereignisse in einem Kafka-Thema. Bei einer Tabelle mit dem Namen test1 lautet der Themenname beispielsweise test1.public.test1. Die Tabelle hat 3 ganzzahlige Spalten (id, v1, v2); id ist der Primärschlüssel.
Wie sehen nun die Kafka-Daten aus? Auf den ersten Blick sehen sie etwas unheimlich aus – in was haben sich die einfachen CRUD-Operationen der Datenbank verwandelt? Dies ist das Ereignis für eine Einfügung:

Struct{after=Struct{id=1,v1=2,v2=3},source=Struct{version=1.6.1.Final,connector=postgresql,name=test1,ts_ms=1632457564326,db=postgres,sequence=["1073751912","1073751912"],schema=public,table=test1,txId=612,lsn=1073751968},op=c,ts_ms=1632457564351}

Für eine Aktualisierung erhalten wir dieses Ereignis:

Struct{after=Struct{id=1,v1=1000,v2=3},source=Struct{version=1.6.1.Final,connector=postgresql,name=test1,ts_ms=1632457801633,db=postgres,sequence=["1140858536","1140858536"],schema=public,table=test1,txId=627,lsn=1140858592},op=u,ts_ms=1632457801973}

Und nach einer Löschung erhalten wir dieses Ereignis:

Struct{before=Struct{id=1},source=Struct{version=1.6.1.Final,connector=postgresql,name=test1,ts_ms=1632457866810,db=postgres,sequence=["1140858720","1140858720"],schema=public,table=test1,txId=628,lsn=1140858776},op=d,ts_ms=1632457867187}
Null

Was fällt uns bei diesen Ereignissen auf? Wie erwartet, entspricht der Operationstyp (c, u, d) der PostgreSQL-Operationssemantik (create – für ein insert, update, delete). Für create und update gibt es einen after-Datensatz, der die ID und die Werte anzeigt, nachdem die Transaktion durchgeführt wurde. Für delete gibt es einen before-Datensatz, der nur die ID enthält, und ein Null für die after-Werte. Außerdem gibt es viele Metadaten, darunter die Zeit, datenbankspezifische Sequenz- und „lsn“-Informationen und eine Transaktions-ID. Mehrere Ereignisse können sich eine Transaktions-ID teilen, wenn sie im selben Transaktionskontext aufgetreten sind.  Wofür ist die Transaktions-ID nützlich? Transaktions-Metadaten , die der txId entsprechen, können in topics mit dem Postfix .transaction geschrieben werden (provide.transaction.metadata ist standardmäßig false).
Diese Daten haben mich zunächst überrascht, da ich nach der ersten Lektüre der Dokumentation etwas besser lesbare (JSON) Änderungsereignisdaten einschließlich Schlüssel- und Wertschemata sowie Nutzdaten erwartet hatte. Aber das „Kleingedruckte“ besagt:

„Daraus, wie Sie den Kafka Connect Converter konfigurieren, den Sie in Ihrer Anwendung verwenden möchten, ergibt sich die Darstellung dieser vier Teile in Änderungsereignissen.“

Offensichtlich war also meine Konfiguration unvollständig. Mit ein bisschen Suchen entdeckte ich die folgenden zusätzlichen Konfigurationseinstellungen: key/value.converter und key/value.schemas.enable werden benötigt, um die Schlüssel- und Werteschemata in die Daten aufzunehmen, und das JSON-Format sollte verwendet werden:

"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "true"
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable": "true"

Nach der Änderung der Konfiguration und einem Neustart des Connectors sind die generierten Daten zwar viel ausführlicher, aber zumindest jetzt wie erwartet im JSON-Format. Bei einer insert-Operation erhalten wir zum Beispiel dieses lange Ereignis:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"v1"},{"type":"int32","optional":true,"field":"v2"}],"optional":true,"name":"test1.public.test1.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"v1"},{"type":"int32","optional":true,"field":"v2"}],"optional":true,"name":"test1.public.test1.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"test1.public.test1.Envelope"},"payload":{"before":null,"after":{"id":10,"v1":10,"v2":10},"source":{"version":"1.6.1.Final","connector":"postgresql","name":"test1","ts_ms":1632717503331,"snapshot":"false","db":"postgres","sequence":"[\"1946172256\",\"1946172256\"]","schema":"public","table":"test1","txId":1512,"lsn":59122909632,"xmin":null},"op":"c","ts_ms":1632717503781,"transaction":null}}

Die expliziten Schema-Metadaten machen die Sache ziemlich komplex, also schalten wir sie folgendermaßen ab:

"value.converter.schemas.enable": "false"
"key.converter.schemas.enable": "false"

Dies ergibt einen besser lesbaren Datensatz, der nur die Nutzdaten enthält (oben hervorgehoben, aber beachten Sie, dass „payload“ nicht mehr angezeigt wird):

{"before":null,"after":{"id":10,"v1":10,"v2":10},"source":{"version":"1.6.1.Final","connector":"postgresql","name":"test1","ts_ms":1632717503331,"snapshot":"false","db":"postgres","sequence":"[\"1946172256\",\"1946172256\"]","schema":"public","table":"test1","txId":1512,"lsn":59122909632,"xmin":null},"op":"c","ts_ms":1632717503781,"transaction":null}

Beachten Sie, dass wir jetzt ein before-Feld und ein after-Feld für create-Operationen haben.
Beachten Sie auch, dass ohne explizites Schema der Kafka-Sink-Connector in der Lage sein muss, die Nutzdaten ohne zusätzlichen Kontext zu verstehen, oder Sie müssen alternativ eine Schema Registry verwenden und konfigurieren. Hier sind die Anweisungen für die Verwendung einer Kafka Schema Registry mit dem von Instaclustr verwalteten Kafka-Dienst. Änderungen an der Konfiguration des Debezium-Quellconnectors müssen Folgendes enthalten:

"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url": "http://schema-registry:8081"

Ich war neugierig, was nach einer truncate-Operation an einer Tabelle passieren würde, aber überraschenderweise wurden überhaupt keine Ereignisse generiert. Ist ein truncate nicht semantisch gleichwertig mit mehreren delete-Operationen? Wie sich herausstellt, sind truncate-Ereignisse standardmäßig ausgeschaltet („truncate.handling.mode“ : „skip“ – nicht „bytes“, wie fälschlicherweise dokumentiert; „include“, um sie einzuschalten).
Die andere Überlegung ist, dass der Kafka-Sink-Connector in der Lage sein muss, truncate-Ereignisse vernünftig zu verarbeiten, was anwendungsspezifisch und/oder Sink-system-spezifisch sein kann. (Z. B. könnte es für Elasticsearch sinnvoll sein, als Reaktion auf ein truncate-Ereignis einen gesamten Index zu löschen. Für die Stream-Verarbeitung hingegen ist es nicht offensichtlich, was eine sinnvolle Reaktion wäre. Allerdings tritt das gleiche Problem vielleicht auch bei Löschungen und Aktualisierungen auf.)

3. Durchsatz des Debezium PostgreSQL Connectors

Wie schnell kann ein Debezium PostgreSQL Connector ausgeführt werden? (Quelle: Shutterstock)

Eine Einschränkung des Debezium PostgreSQL Connectors ist, dass er nur als einzelner Task ausgeführt werden kann. Ich habe einige Auslastungstests durchgeführt und festgestellt, dass ein einzelner Task maximal 7.000 Daten-Änderungsereignisse pro Sekunde verarbeiten kann. Dies entspricht auch den Transaktionen pro Sekunde, solange es nur ein Änderungsereignis pro Transaktion gibt. Bei mehreren Ereignissen pro Transaktion ist der Transaktionsdurchsatz geringer. In einem früheren Blog (Pipeline-Blogserie Teil 9) haben wir 41.000 Einfügungen pro Sekunde in PostgreSQL erreicht. Davon sind 7.000 lediglich 17 %. Dieser Teil der CDC-Pipeline ist also in der Praxis eher ein Elefant als ein Gepard. Typische PostgreSQL-Workloads bestehen jedoch aus eine Mischung an Schreib- und Lesevorgängen, sodass die Schreibrate wesentlich geringer sein kann, was den Debezium PostgreSQL Connector zu einer praktikableren Lösung macht.

Ich habe noch ein weiteres, etwas merkwürdiges Verhalten festgestellt, das Sie vielleicht beachten sollten. Wenn zwei (oder mehr) Tabellen auf Änderungsereignisse überwacht werden und die Last nicht gleichmäßig auf die Tabellen verteilt ist (z. B., wenn ein Batch von Änderungen in einer Tabelle kurz vor der nächsten auftritt), dann verarbeitet der Connector alle Änderungen der ersten Tabelle, bevor er mit den Änderungen für die zweite Tabelle beginnt. Bei dem von mir entdeckten Beispiel kam es zu einer Verzögerung von 10 Minuten. Ich bin mir nicht ganz sicher, was da vor sich geht, aber es sieht so aus, als ob der Connector alle Änderungen für eine Tabelle verarbeiten muss, bevor er zur nächsten Tabelle übergeht. Bei normalen, ausgeglichenen Workloads mag dies in Ordnung sein, aber bei Spitzen-/Batch-Lasten, die eine einzelne Tabelle stark auslasten, kann es Probleme bei der rechtzeitigen Verarbeitung von Änderungsereignissen aus anderen Tabellen verursachen.

Eine mögliche Lösung ist, mehrere Connectors zu verwenden. Dies scheint möglich zu sein (siehe z. B. diesen nützlichen Blog) und kann auch dazu beitragen, das Limit für die Verarbeitung von 7.000 Ereignissen pro Sekunde zu beseitigen. Allerdings würde es wahrscheinlich nur funktionieren, wenn sich die Tabellen zwischen den Connectors nicht überschneiden, und Sie müssten mehrere Replication-Slots haben, damit es funktioniert (es gibt eine Konfigurationsoption des Connectors für slot.name).

4. Debezium PostgreSQL Connector Datenänderungs-Erfassungsereignisse mit Kafka Sink-Connectors in Elasticsearch streamen

Die endgültige Metamorphose – vom Gepard (Kafka) zum Nashorn! (Sink-System, z. B. Elasticsearch) (Quelle: Shutterstock)

Es ist natürlich nicht das Ziel, genügend Daten-Änderungsereignisse in Kafka zu erhalten und sie zu verstehen, sondern sie in ein oder mehrere Sink-Systems zu streamen.

Ich wollte jedoch eine einfache Möglichkeit haben, das komplette End-to-End-System zu testen. Insbesondere mit einem Ansatz, der keine benutzerdefinierten Kafka Connect Sink-Connectors benötigt, um komplexe Daten-Änderungsereignisse und die Semantik des Sink-Systems zu interpretieren, oder eine Schema Registry betreiben muss (was wahrscheinlich auch einen benutzerdefinierten Quellconnector erfordern würde). Deshalb habe ich die Elasticsearch Sink-Connectors aus der letzten Pipeline-Blogserie wiederverwendet. Dieser Ansatz hat sich bereits beim Lesen von JSON-Daten ohne Schema bewährt und schien daher auch für diesen Anwendungsfall ideal.

Das „erste Taxi in der Schlange“ (in Zeiten von Fahrgemeinschafts-Apps eine anachronistische Wendung) ist der Apache Camel Kafka Elasticsearch Sink-Connector. Dies war der Connector, der sich bei früheren Experimenten als am robustesten erwiesen hatte. Leider fehlte dieses Mal eine Klasse (org.elasticsearch.rest.BytesRestResponse), was ich nicht weiter zu beheben versucht habe. Wahrscheinlich wäre ich kein guter Spion, denn jeder, der sich im Spionagehandwerk auskennt, weiß, dass man nicht das erstbeste Taxi nehmen sollte, das um die Ecke kommt!

Für meinen zweiten Versuch verwendete ich einen weiteren quelloffenen Elasticsearch Sink-Connector von lenses.io, der standardmäßig im Managed Kafka Connect-Dienst von Instaclustr enthalten ist.

Hier eine Beispielkonfiguration für diesen Connector (geben Sie die IP-Adressen von Kafka Connect und Elasticsearch sowie Benutzernamen und Passwörter an):

curl https://KC_IP:8083/connectors/elastic-sink-tides/config -k -u KC_user:KC_password -X PUT -H 'Content-Type: application/json' -d '
{
    "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector",
    "tasks.max" : 100,
    "topics" : "test1.public.test1",
    "connect.elastic.hosts" : "ES_IP",
    "connect.elastic.port" : 9201,
    "connect.elastic.kcql" : "INSERT INTO test-index SELECT * FROM test1.public.test",
    "connect.elastic.use.http.username" : "ES_user",
    "connect.elastic.use.http.password" : "ES_password"
  }
}'

Der Task wurde korrekt ausgeführt. Beachten Sie, dass wir für die Verarbeitung von 7.000 Ereignissen pro Sekunde mehrere Sink-Connector-Tasks benötigen, und Sie müssen auch die Anzahl der Kafka-Partitionen entsprechend erhöhen (Partitionen >= Tasks).

Eine Einschränkung dieser Connector-Konfiguration besteht darin, dass sie alle Ereignisse als insert-Ereignisse verarbeitet. Unsere Daten-Änderungsereignisse können jedoch before– und after-Felder haben, von denen er nichts weiß, wodurch Sie „Junk“ im Elasticsearch-Index erhalten, den Sie anschließend interpretieren müssen. Eine einfache Lösung ist die Verwendung einer SMT (Single Message Transformation) auf dem Sink-Connector, um nur die after-Felder zu extrahieren. Ich habe den ExtractNewRecordState SMT zum „Abflachen der Ereignisse“ verwendet. Hier ist die endgültige Konfiguration des Debezium PostgreSQL-Quellconnectors einschließlich des SMT:

curl https://KC_IP:8083/connectors -X POST -H 'Content-Type: application/json' -k -u kc_user:kc_password -d '{
  "name": "debezium-test1",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg_ip",
    "database.port": "5432",
    "database.user": "pg_user",
    "database.password": "pg_password",
    "database.dbname" : "postgres",
    "database.server.name": "test1",
    "plugin.name": "pgoutput",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "transforms": "unwrap",
    "transforms.unwrap.type":     "io.debezium.transforms.ExtractNewRecordState"
      }
}
'

Um schließlich alles zu überprüfen, habe ich meine ursprünglichen NOAA-Pipeline-Daten und -Themen wiederverwendet. Dadurch konnte ich auch prüfen, ob die JSON-Daten wie erwartet in Elasticsearch indiziert wurden (obwohl ich dieses Mal nur Standard-Zuordnungen verwendet habe), und ich konnte auch prüfen, ob es einen Unterschied im Durchsatz zwischen PostgreSQL JSON- und JSONB-Lesevorgängen (mit einem GIN-Index) gab – ich freue mich, berichten zu können, dass es keinen gab.

Wie ich jedoch in der Pipeline-Blogserie Teil 8 entdeckte, haben Elasticsearch Sink-Connectors Schwierigkeiten, mehr als 1.800 Einfügungen pro Sekunde zu indizieren, was weit unter der Task-Beschränkung bei Single-Thread-Connectors von 7.000 Ereignissen pro Sekunde liegt (in Teil 9 haben wir jedoch mit einem Workaround und der BULK-API bessere Ergebnisse erzielt), womit jeglicher Unterschied zwischen JSON- und JSONB-Performance eventuell maskiert wird, aber das ist sicherlich nicht das Haupt-Performanceproblem.

5. Fazit

In diesem Blog haben wir erfolgreich eine Test-CDC-Pipeline von PostgreSQL zu einem Beispiel-Sink-System (z. B. Elasticsearch) unter Verwendung des Debezium PostgreSQL Connectors und des Instaclustr Managed Kafka Connect und OpenDistro Elasticsearch-Dienst bereitgestellt, konfiguriert und ausgeführt. Für viele Anwendungsfälle werden Sie komplexere Kafka Sink-Connectors benötigen, um die Semantik des Daten-Änderungsereignisses und ihre Anwendung auf verschiedene Sink-Systems zu interpretieren, und es gibt noch viele weitere Konfigurationsoptionen, die ich nicht berücksichtigt habe. In Anbetracht der potenziellen Einschränkungen im Single-Task-Betrieb und anderer potenzieller Eigenarten in Bezug auf die Performance sollten Sie ebenfalls einen geeigneten Leistungs- und Verzögerungstest mit realistischen Daten und angemessen dimensionierten Systemen durchführen, bevor Sie in die Produktion gehen.

Hinweis: Die Experimente in diesem Blog wurden in einer Entwicklungsumgebung durchgeführt. Dabei wurde eine Kombination aus Open-Source-/selbstverwaltetem PostgreSQL (nicht unser verwalteter PostgreSQL-Dienst) in Verbindung mit den verwalteten Diensten Kafka Connect und Elasticsearch von Instaclustr verwendet. Derzeit haben wir Kunden, die Debezium in einer privaten Preview für unseren verwalteten Cassandra-Dienst verwenden, aber zum Zeitpunkt der Veröffentlichung wird Debezium noch nicht für unseren verwalteten PostgreSQL-Dienst angeboten.

Der Orininalartikel stammt von Paul Brebner und wurde auf Instaclustr.com am 9. August 2022 veröffentlicht.

In diesem Blog-Beitrag fahren wir mit unserer Anwendung der Cadence Drohnenlieferung fort, inklusive einer Zusammenfassung des vollständigen Workflows und der Cadence+Kafka Integrationsmuster. Außerdem werden wir uns einige neue Cadence-Funktionen ansehen, die wir verwendet haben (Retry, Continue, Abfragen, Nebeneffekte), und wir werden eine beispielhafte Verfolgung einer Drohnenlieferung enthüllen.

1. Bewegung und Lieferung

(Quelle: Shutterstock)

Die Details der Drohnenbewegung wurden im vorherigen Blog-Beitrag erläutert, doch zusammenfassend lässt sich Folgendes festhalten: Der Drohnen-Workflow ist für die „Bewegung“ vom Stützpunkt zum Bestellstandort, für die Abholung der Bestellung, für das Fliegen zum Lieferstandort, für das Ablegen der Bestellung und das Zurückkehren zum Stützpunkt verantwortlich. Dies geschieht mit der Activity nextLeg() und wenn die Bestellung abgeholt, aber noch nicht geliefert wurde, werden Signale der Standortstatusänderung an den Bestellungs-Workflow (6) gesendet. Eine interessante Erweiterung könnte die Aktualisierung einer ETA im Bestellungs-Workflow sein. Nachdem die Drohne zum Stützpunkt zurückgekehrt ist, markiert sie die Lieferung als „abgehakt“ und signalisiert dem Bestellungs-Workflow, dass er abgeschlossen ist. Das führt dazu, dass der Bestellungs-Workflow abgeschlossen wird (7) und der Drohnen-Workflow eine neue Instanz von sich selbst startet (8).

2. Zusammenfassung der Cadence+Kafka Integrationsmuster

Nun haben wir einige Beispiel für verschiedene Möglichkeiten gesehen, wie Cadence und Kafka integriert werden können, einige davon in unserem früheren Blog-Beitrag. Wir fassen sie hier für ein leichteres Verständnis in einer Tabelle zusammen.

Musternummer Mustername Richtung Zweck Merkmale Beispiele
1 Senden einer Nachricht an Kafka Cadence → Kafka Senden einer einzelnen Nachricht an Kafka-Thema, simple Notification (no response) Umschließt einen Kafka-Producer in einer Cadence-Activity Blog 2, Blog 4
2 Anfrage/    Antwort von Cadence an Kafka Cadence → Kafka → Cadence Wiederverwenden des Kafka-Microservice von Cadence, Anfragen und Antworten an Kafka Umschließt einen Kafka-Producer in einer Cadence-Activity, um eine Nachricht und Header-Metadaten als Antwort zu senden; Kafka-Consumer verwendet Metadaten, um dem Cadence-Workflow oder der Activity mit dem Ergebnis zu signalisieren, dass fortgefahren werden kann Blog 2
3 Starten eines neuen Cadence-Workflows von Kafka Kafka → Cadence Starten des Cadence-Workflows vom Kafka-Consumer als Reaktion auf das Empfangen eines neuen Datensatzes Kafka-Consumer startet das Ausführen der Cadence-Workflow-Instanz, eine Instanz pro empfangenem Datensatz Blog 4
4 Erhalten des nächsten Jobs aus einer Warteschlange Cadence → Kafka → Cadence Jede Workflow-Instanz erhält einen einzelnen Job von einem Kafka-Thema zum Verarbeiten Die Cadence-Activity umschließt einen Kafka-Consumer, der das Kafka-Thema kontinuierlich abruft, nur einen einzelnen Datensatz zurückgibt, transient für die Dauer der Activity Blog 4

3. Neue verwendete Cadence-Merkmale

Wir haben einige neue Funktionen in die Demo-Anwendung der Drohnenlieferung eingebaut, die ich in diesem Blog-Beitrag näher beleuchten möchte.

3.1 Cadence-Retry

(Quelle: Shutterstock)

Cadence-Activities und -Workflows können aus unterschiedlichen Gründen fehlschlagen, unter anderem wegen Zeitüberschreitungen. Retries können manuell verarbeitet werden, oder Sie lassen den Cadence-Server den Retry handhaben, was wesentlich einfacher ist. Sie finden die Cadence-Dokumentation zu Retries hier und hier. Für meine Drohnenlieferung-Anwendung habe ich festgelegt, dass Retries für meine Activities verwendet werden sollen. Der Grund dafür ist, dass sie das Potenzial haben, „lange“ zu laufen, und wenn sie fehlschlagen, können sie wieder aufgenommen und dort fortgesetzt werden, wo sie abgebrochen wurden. Die waitForOrder()-Activity ist eine blockierende Activity, die erst dann zurückkehrt, wenn sie einen Auftrag erhalten hat, der geliefert werden muss, sodass sie potenziell eine unbestimmte Zeit dauern kann. Außerdem ist sie unabhängig, was bedeutet, dass sie problemlos mehrmals aufgerufen werden kann (wie wir oben gesehen haben, handelt es sich tatsächlich um einen Kafka-Consumer, der ein Thema abruft). Die nextLeg()-Activity ist dafür verantwortlich, den Weg der Drohne von einem Ort zum anderen aufzuzeichnen, was viele Minuten in Anspruch nehmen kann. Wenn sie fehlschlägt, wollen wir, dass sie wiederaufgenommen wird, aber nicht vom aktuellen Standort (die neueste Version verwendet jetzt eine Abfragemethode, um den aktuellen Drohnenstandort zu Beginn der Activity abzurufen). Die einfachste Möglichkeit, die Optionen für einen Retry festzulegen, ist eine @MethodRetry-Annotation im Activity-Interface wie in diesem Beispiel. Die Einstellungen einschließlich Zeiten richtig abzurufen, kann jedoch eine Herausforderung sein, und ich habe diese nur „geraten“:

@MethodRetry(maximumAttempts = 3, initialIntervalSeconds = 1, expirationSeconds = 300, maximumIntervalSeconds = 30)
    	void nextLeg(LatLon start, LatLon end, boolean updateOrderLocation, String orderID);

3.2 Cadence „Continue“

Sie könnten versucht sein, einen Cadence-Workflow ewig laufen zu lassen – schließlich sind sie für lang laufende Prozesse konzipiert. Dies wird jedoch im Allgemeinen als „schlechte Praxis“ angesehen, da die Größe des Workflow-Status immer weiter ansteigt, möglicherweise die maximale Statusgröße überschreitet und die Neuberechnung des aktuellen Status von Workflows aus dem Statusverlauf verlangsamt. Ich habe folgende einfache Lösung verwendet: Am Ende des vorhandenen Workflows wird mit Workflow.continueAsNew() ein neuer Drohnen-Workflow gestartet. Dadurch beginnt eine neue Workflow-Instanz mit derselben Workflow-ID, aber nicht mit demselben Status wie der ursprüngliche Workflow. Das bedeutet, dass Drohnen-Workflows am Stützpunkt beginnen, voll aufgeladen sind und noch keine Bestellung zum Abholen haben.

3.3 Cadence-Abfragen

Im 2. Cadence-Blog-Beitrag haben wir gezeigt, dass Workflows eine Schnittstellenklasse definieren müssen und dass die Schnittstellenmethoden Annotationen haben können, einschließlich @WorkflowMethod (der Einstiegspunkt in einen Workflow, exakt eine Methode muss diese Annotation haben) und @SignalMethod (eine Methode, die auf externe Signale reagiert, null oder mehr Methoden können diese Annotation haben).

Es gibt jedoch eine weitere Annotation, die wir nicht verwendet haben: @QueryMethod. Diese Annotation zeigt eine Methode an, die auf synchrone Abfrageanfragen reagiert, und von denen kann es viele geben. Sie dienen dazu, anderen Workflows usw. den Status anzuzeigen, sodass es sich im Grunde um eine „Getter“-Methode handelt.  Als Beispiel ist hier eine vollständige Liste von Methoden, einschließlich einiger Abfragemethoden für den OrderWorkflow:

public interface OrderWorkflow {
@WorkflowMethod(executionStartToCloseTimeoutSeconds = (int)maxFlightTime, taskList = orderActivityName)
String startWorkflow(String name);
@SignalMethod
void updateGPSLocation(LatLon l);
@SignalMethod
void signalOrder(String msg);
@SignalMethod
void updateLocation(String loc);
@QueryMethod
String getState();
@QueryMethod
LatLon getOrderLocation();
@QueryMethod
LatLon getDeliveryLocation();
    }

Abfragen müssen schreibgeschützt und nicht blockierend sein. Ein wesentlicher Unterschied zwischen Signalen und Abfragen, der mir aufgefallen ist, besteht darin, dass Signale nur bei nicht abgeschlossenen Workflows funktionieren, während Abfragen sowohl bei nicht abgeschlossenen als auch bei abgeschlossenen Workflows funktionieren. Bei abgeschlossenen Workflows werden die Workflows automatisch neu gestartet, sodass ihr Endzustand wiederhergestellt werden kann. Das ist clever!

3.4 Cadence-Nebeneffekte

Der Cadence-Workflow-Code muss im Allgemeinen deterministisch sein (Quelle: Shutterstock)

Ein letzter Trick, den ich bei dieser Anwendung genutzt habe, ist die Verwendung von Zufallszahlen zur Berechnung der Bestell- und Lieferorte in der Funktion newDestination(). Um diese im Order Activity-Workflow korrekt zu verwenden, musste ich Cadence mit der Methode Workflow.sideEffect()
sagen, dass er Nebeneffekte hat. So können Workflows die bereitgestellte Funktion einmal ausführen und das Ergebnis im Workflow-Verlauf speichern. Das Ergebnis des aufgezeichneten Verlaufs wird zurückgegeben, ohne dass die angegebene Funktion während der Wiedergabe ausgeführt wird. Dies garantiert die deterministische Anforderung für Workflows, da bei der Wiedergabe genau das gleiche Ergebnis zurückgegeben wird. Hier ist mein Beispiel aus der startWorkflow()-Methode des Bestellungs-Workflows (oben):

startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance));

deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance));

Der Code ist in unserem Github-Repository verfügbar. Im nächsten Blog-Beitrag in dieser Reihe werden wir die Skalierbarkeit von Cadence näher betrachten und herausfinden, wie viele Drohnen gleichzeitig fliegen können.

Anhang: Beispielhafte Verfolgung

Wenn alles nach Plan verläuft, sieht eine typische (gekürzte) Drohnen- und Bestellungs-Workflow-Verfolgung so aus:

Order WF readyForDelivery activity order_0 id 939328c5-eaad-4f52-9aaf-08100fde1e84
waitForOrder got an order! topic = orderjobs2, partition = 0, offset = 17, key = , value = 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0 got an order from Kafka + 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0 has got order 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0: new state = gotOrder, location = base
order order_0 got signal = droneHasOrder
Drone Drone_0 has generated a flight plan based on Order and Delivery locations
Start lat -35.20586, lon 149.09462
Order lat -35.189085007724415, lon 149.09190627324634
Delivery lat -35.1849320141879, lon 149.08653974053584
End lat -35.20586, lon 149.09462
Drone Drone_0 flight plan total distance (km) = 4.9930887486783835
Drone Drone_0 estimated total flight time (h) = 0.24965443743391919
Drone Drone_0 distance to order (km) = 1.881431511294941
Drone Drone_0 estimated time until order pickup (h) = 0.09407157556474705
Drone Drone_0 distance to delivery (km) = 2.5530367293280447
Drone Drone_0 estimated time until delivery (h) = 0.12765183646640224
Drone Drone_0: new state = flightPlanGenerated, location = base
Drone + Drone_0 flying to pickup Order
Drone Drone_0: new state = flyingToOrder, location = betweenBaseAndOrder
order order_0 got signal = droneOnWayForPickup
Drone WF gpsLocation = lat -35.20586, lon 149.09462
start loc = lat -35.20586, lon 149.09462
Drone flew to new location = lat -35.20536523834301, lon 149.09453994515312
Distance to destination = 1.825940473393331 km
Drone Drone_0 gps location update lat -35.20536523834301, lon 149.09453994515312
Drone Drone_0 charge now = 99.44444444444444%, last used = 0.5555555555555556
Drone WF gpsLocation = lat -35.20536523834301, lon 149.09453994515312
start loc = lat -35.20586, lon 149.09462
Drone flew to new location = lat -35.20487047663333, lon 149.09445989128173
Distance to destination = 1.770449431350198 km
Drone Drone_0 gps location update lat -35.20487047663333, lon 149.09445989128173
Drone flew to new location = lat -35.204375714870956, lon 149.0943798383858
Distance to destination = 1.714958384759105 km
Drone Drone_0 charge now = 98.88888888888889%, last used = 0.5555555555555556
...
Drone flew to new location = lat -35.189085007724415, lon 149.09190627324634
Distance to destination = 0.0 km
Drone Drone_0 gps location update lat -35.189085007724415, lon 149.09190627324634
Drone arrived at destination.
Drone Drone_0 charge now = 81.11111111111106%, last used = 0.5555555555555556
Drone Drone_0 picking up Order
Drone Drone_0: new state = pickingUpOrder, location = orderLocation
Drone Drone_0 picking up Order!
Drone Drone_0 picked up Order!
order order_0 got signal = pickedUpByDrone
Drone Drone_0 charge now = 77.77777777777773%, last used = 3.3333333333333335
Drone Drone_0 delivering Order...
Drone Drone_0: new state = startedDelivery, location = betweenOrderAndDelivery
Order new location = orderLocation
order order_0 got signal = outForDelivery
Order new location = onWay
Drone WF gpsLocation = lat -35.189085007724415, lon 149.09190627324634
start loc = lat -35.189085007724415, lon 149.09190627324634
Drone flew to new location = lat -35.188741877698526, lon 149.09146284539662
Distance to destination = 0.6161141689892213 km
Drone Drone_0 gps location update lat -35.188741877698526, lon 149.09146284539662
Drone Drone_0 charge now = 77.22222222222217%, last used = 0.5555555555555556
Drone flew to new location = lat -35.18839874605641, lon 149.09101942129195
Distance to destination = 0.5606231205354245 km
Order  GPS Location lat -35.188741877698526, lon 149.09146284539662
...
Drone flew to new location = lat -35.1849320141879, lon 149.08653974053584
Distance to destination = 0.0 km
Drone Drone_0 gps location update lat -35.1849320141879, lon 149.08653974053584
Drone Drone_0 charge now = 70.55555555555549%, last used = 0.5555555555555556
Drone arrived at destination.
Order GPS Location lat -35.1849320141879, lon 149.08653974053584
Drone + Drone_0 dropping Order!
Drone Drone_0: new state = droppingOrder, location = deliveryLocation
Drone + Drone_0 dropped Order!
Order  new location = deliveryLocation
Drone Drone_0 charge now = 67.22222222222216%, last used = 3.3333333333333335
Drone Drone_0 returning to Base
Drone Drone_0: new state = returningToBase, location = betweenDeliveryAndBase
order order_0 got signal = delivered
Drone WF gpsLocation = lat -35.1849320141879, lon 149.08653974053584
start loc = lat -35.1849320141879, lon 149.08653974053584
Drone flew to new location = lat -35.1854079590788, lon 149.08672345348626
Distance to destination = 2.384560977495208 km
Drone Drone_0 gps location update lat -35.1854079590788, lon 149.08672345348626
Drone flew to new location = lat -35.18588390369231, lon 149.08690716858857
Distance to destination = 2.3290699362110967 km
Drone Drone_0 charge now = 66.6666666666666%, last used = 0.5555555555555556
...
Drone flew to new location = lat -35.20586, lon 149.09462
Distance to destination = 0.0 km
Drone Drone_0 charge now = 43.3333333333332%, last used = 0.5555555555555556
Drone Drone_0 gps location update lat -35.20586, lon 149.09462
Drone arrived at destination.
Drone Drone_0 charge now = 42.777777777777644%, last used = 0.5555555555555556
Drone + Drone_0 returned to Base!
Drone Drone_0: new state = backAtBase, location = base
Drone Drone_0: new state = checkOrder, location = base
Drone Drone_0: new state = droneDeliveryCompleted, location = base
Drone Drone_0: new state = charging, location = base
Drone Drone_0 charging! charging time = 515s
Drone Drone_0: new state = charged, location = base
order order_0 got signal = orderComplete
Order WF exiting!

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

In Teil 3 meiner Cadence-Blogserie habe ich die Drone Delivery Demo-Anwendung vorgestellt und mich dabei auf den Drohnen-Workflow konzentriert. In diesem Beitrag werden wir uns Drone Delivery aus der Perspektive der Bestellungs-Workflows ansehen, erfahren, wie die Drohnen- und Bestellungs-Workflows miteinander interagieren, einige zusätzliche Cadence+Kafka-Integrationsmuster entdecken und einige neue Cadence-Funktionen (z. B. Neuversuche, Nebeneffekte, Abfragen und Als neu fortfahren) genauer betrachten.

1. Die Architektur der Drone Delivery-Anwendung

Damit wir tiefer in die Drone Delivery-Anwendung eintauchen können, verwenden wir dieses übersichtliche Architekturdiagramm, um den Aufbau besser verstehen zu können. Die oberen 2 Reihen sind die Apache Kafka®-Komponenten, und die unteren 2 Reihen sind die Cadence-Workflows. Ich habe die Workflow-Schritte aus Gründen der Übersichtlichkeit vereinfacht. In den unteren Reihen wird jeder Drohnen-Workflow einer physischen Drohne zugeordnet, wobei die Schritte den Ereignissen im Lebenszyklus der Drohnenlieferung entsprechen.

Aus der Bestellungsperspektive spiegeln die nummerierten Schritte die Ereignisse im Lebenszyklus von Bestellungen/Lieferungen wie folgt wider:

  1. Neue Bestellung erstellen
  2. Neuen Bestellungs-Workflow erstellen
  3. Bestellung bereit für Lieferung
  4. Drohne wartet auf Bestellung
  5. Drohne hat Bestellung zugeordnet
  6. Bestellung wurde von Drohne aufgenommen und befindet sich in Auslieferung, Standort wird während des Flugs aktualisiert
  7. Bestellung geliefert und geprüft, Bestellung abgeschlossen

Sehen wir uns nun den Bestellablauf genauer an.

2. Neue Bestellung

Ein Kunde bestellt etwas über die Drone Delivery App und löst so den Bestell- und Drohnenlieferprozess aus (Quelle: Shutterstock)

Der erste Schritt im Lebenszyklus einer Bestellung besteht darin, dass ein Kunde etwas über eine Drohnenlieferungs-App bestellt. Wir gehen davon aus, dass dies ein Ereignis „Neue Bestellung erstellen“ auslöst, das in das Kafka-Topic „Neue Bestellungen“ gestellt wird. Dies bringt uns zum ersten unserer neuen Cadence+Kafka-Integrationsmuster, dem unten stehenden Muster „Neuen Cadence-Workflow von Kafka starten“ (1).

Dieses Muster ist einfach. Ein unabhängiger Kafka-Consumer läuft ständig und holt die nächste Bestellung aus dem Topic „Neue Bestellungen“ ab. Mithilfe eines Cadence-Clients erstellt und startet er eine neue Bestellungs-Workflow-Instanz. Hier ist ein beispielhafter Code dafür:

WorkflowClient workflowClient =
                WorkflowClient.newInstance(
                        new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()),
                        WorkflowClientOptions.newBuilder().setDomain(domainName).build());

        Properties kafkaProps = new Properties();

        try (FileReader fileReader = new FileReader("consumer2.properties")) {
            kafkaProps.load(fileReader);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // uses a unique group
        kafkaProps.put("group.id", "newOrder");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
            consumer.subscribe(Collections.singleton(newordersTopicName));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                	System.out.print("Consumer got new Order WF creation request! ");
                    System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    
                    String orderName = record.value().toString();
                    
                    OrderWorkflow orderWorkflow = workflowClient.newWorkflowStub(OrderWorkflow.class);
                    System.out.println("Starting new Order workfow!");
                	WorkflowExecution workflowExecution = WorkflowClient.start(orderWorkflow::startWorkflow, orderName);
                    System.out.println("Started new Order workfow! Workflow ID = " + workflowExecution.getWorkflowId());
                    }
                }
            }
        }

Dies ist also ein Beispiel für die Ausführung von Cadence-Code in einem Kafka-Consumer. Etwas Ähnliches haben wir bereits in Teil 2 gesehen, wo wir ein Signal an einen laufenden Cadence-Workflow in einem Kafka-Consumer gesendet haben. Der Unterschied besteht darin, dass wir in diesem Beispiel einen Cadence-Workflow starten.

3. Der Bestellungs-Workflow

Der Bestellungs-Workflow ist ganz einfach. Nach dem Start werden zufällige Bestellungs- und Lieferorte generiert (die garantiert innerhalb der Reichweite der Drohne liegen, damit sie auch zur Basis zurückkehren kann), in einer Activity wird eine Nachricht an Kafka gesendet, um mitzuteilen, dass die Drohne bereit für die Lieferung ist (siehe unten), der Status des Ortes wird in einer Schleife aktualisiert (die über ein Signal vom Drohnen-Workflow empfangen wird) und dann wird gewartet, bis der Status „orderComplete“ erreicht ist, um den Prozess zu beenden. Andere Activities sind denkbar, z. B. die Überprüfung auf Lieferverletzungen und das Senden von Standortaktualisierungen an Kafka zur Analyse und Zuordnung.

public static class OrderWorkflowImpl implements OrderWorkflow {
@Override
        public String startWorkflow(String name) {
        	System.out.println("Started Order workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId());
        	        	
        	// Order creates fake order and delivery locations
// randomly generated but within range of Drones
        	startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance));
        	System.out.println("Order WF startLocation = " + startLocation.toString());
        	
          	deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance));
        	System.out.println("Order WF deliveryLocation = " + deliveryLocation.toString());

        	// A real activity - request a drone - wraps a Kafka producer
        	activities.readyForDelivery(name);
        	boolean delivered = false;      	
          	String endState = "orderComplete";
        	
        	while (!delivered)
        	{
          		Workflow.await(() -> newState != "");
        		System.out.println("order " + name + " got signal = " + newState);
        		updates.add(newState);
        		if (newState.equals(endState))
        		{
        			delivered = true;
        			System.out.println("Order WF exiting!");
        		}
          		lastState = newState;
        		newState = "";      		
          	} 	
        	return "Order " + name + " " + endState;	
        }
}

Wir werden die Nebeneffekte unten erklären.

4. Die Drohne erhält die nächste Bestellung zur Lieferung

Ein Taxistand modelliert Menschen (Bestellungen), die in der Schlange auf das nächste verfügbare Taxi (Drohne) warten. (Quelle: Shutterstock)

Sobald die Bestellung zur Abholung bereit ist (möglicherweise nach einer Verzögerung aufgrund der Vorbereitungszeit für die Bestellung), sind wir bereit für die entscheidende Koordination zwischen den Workflows der Drohne und der Bestellung unter Verwendung des Cadence+Kafka-Musters „nächsten Auftrag aus einer Warteschlange holen“ (2, 3).

Wir erhoffen uns von dieser Interaktion, dass (a) die Drohnen bereit sind, eine Bestellung auszuliefern, (b) die Bestellungen zur Auslieferung bereit sind, (c) genau eine Bestellung genau einer Drohne zugewiesen wird. Das heißt, wir wollen nicht, dass sich Drohnen um Bestellungen „streiten“, dass Drohnen versuchen, mehr als eine Bestellung auszuliefern, oder dass Bestellungen, denen keine Drohne zugewiesen wird, nicht ausgeführt werden. (a) und (b) können in beliebiger Reihenfolge auftreten, und es können jederzeit 0 oder mehr Drohnen oder Bestellungen bereitstehen.

Wie funktioniert dieses Muster in der Praxis? Es besteht tatsächlich aus zwei Cadence+Kafka-Untermustern.

Das erste Muster (2) ist eine einfache Ein-Wege-Benachrichtigung von Cadence an Kafka. Der Bestellungs-Workflow hat eine Activity, readyForDelivery(), die einen Kafka-Producer umschließt. Dies ist ein Fernaufruf, der fehlschlagen kann. Deshalb habe ich eine Cadence Activity verwendet, obwohl sie nicht lange läuft und nicht auf eine Antwort wartet, anders als das Cadence+Kafka-Microservices-Muster, das wir in Blog 2 demonstriert haben, das eine Benachrichtigung sendet und dann auf eine Antwort von Kafka wartet. Der Producer sendet die ID der Bestellung an das Topic „Bestellungen bereit“ und dann blockiert der Bestellungs-Workflow, während er mit Workflow.await() auf ein Signal von einer Drohne wartet, das besagt, dass die Bestellung abgeholt wurde.

Aber wie nimmt die Drohne eine vorbereitete Bestellung an? Hier kommt das zweite Muster ins Spiel (3). Der Drohnen-Workflow hat eine Activity „Warten auf Bestellung“ (3). Diese umschließt einen Kafka-Consumer (3a), der tatsächlich im Cadence Activity-Thread läuft. Er ist also vorübergehend und dauert nur so lange, wie die Activity läuft. Der Workflow fragt das Topic „Bestellungen bereit“ ab, bis eine einzelne Bestellung zurückgegeben wird (3b), wodurch die Activity abgeschlossen wird (3c).

Kafka-Consumer werden für diesen Anwendungsfall etwas anders als normalerweise verwendet. Es gibt genau einen Consumer pro Drohnen-Workflow im Zustand „Warten auf Bestellung“. Der Consumer fragt das Topic so lange ab, bis ein einziger Datensatz zurückgegeben wird, und wird dann beendet. Um sicherzustellen, dass nur 1 Bestellung zurückgegeben wird, haben wir max.poll.records auf 1 gesetzt.

Alle diese Consumer teilen sich eine gemeinsame Consumer-Gruppe, sodass die Bestellungen auf alle wartenden Drohnen verteilt werden, aber nur eine Drohne die jeweilige Bestellung erhalten kann. Wir verwenden keinen Kafka-Schlüssel, sodass die Datensätze einfach nach dem Round-Robin-Prinzip an die Consumer geliefert werden. Es kann ein gewisser Overhead entstehen, weil Consumer regelmäßig der Gruppe beitreten und sie wieder verlassen (hauptsächlich Verzögerung durch Neuverteilung). Und wenn die Anzahl der Drohnen steigt, muss die Anzahl der Partitionen des Topics erhöht werden, um sicherzustellen, dass es genügend Partitionen für die Anzahl der Consumer gibt. Die Regel lautet: Partitionen >= Consumer. Sie könnten versucht sein, die Anzahl der Partitionen zu Beginn sehr hoch anzusetzen, aber frühere Experimente haben gezeigt, dass zu viele Partitionen den Durchsatz des Kafka-Clusters verringern können und dass es eine optimale Anzahl von Partitionen gibt, die von der Größe des Clusters abhängt (<= 100 Partitionen ist für den normalen Betrieb in Ordnung). Wenn Sie mehr Drohnen haben, erhöhen Sie einfach die Größe Ihres Kafka-Clusters, um damit Schritt zu halten. Hier ist die Implementierung der waitForOrder() Activity:

public static class DroneActivitiesImpl implements DroneActivities
{
public String waitForOrder(String name) {
        	        	 
        	 // Kafka consumer that polls for a new Order that's been created and is ready for pickup to trigger Drone delivery trip
        	 // Each Drone can only have 1 order at a time, and each order can only be delivered by 1 drone (or drone wars may result)
          	 Properties kafkaProps = new Properties();

             try (FileReader fileReader = new FileReader("consumer2.properties")) {
                 kafkaProps.load(fileReader);
             } catch (IOException e) {
                 e.printStackTrace();
             }
             
             // set max.poll.records to 1 so we onlyu get 1 order at time. 
             // All consumers waiting for order are in their own shared consumer group
             // NOTE that this means we need partitions >= number of Drones - assumption is this is < 100 for performance reasons
             kafkaProps.put("group.id", "waitForOrder");
             kafkaProps.put("max.poll.records", "1");
             
             try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
                 consumer.subscribe(Collections.singleton(orderjobsTopicName));
                 while (true) {
                     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                     for (ConsumerRecord<String, String> record : records) {
                         System.out.print("waitForOrder got an order! ");
                         System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
                                 record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                         // ensure that we don't get this order again
                         consumer.commitAsync();
                         return record.value().toString();
                     }
                 }
             }
             catch (Exception e)
             {
            	 e.printStackTrace();
             }
			return "";
         }
}

Der Code ist in unserem Github-Repository verfügbar.

Das soll es für diesen Blog-Beitrag gewesen sein. Im nächsten Teil werden wir mit einer Zusammenfassung der verwendeten Cadence- & Kafka-Integrationsmuster fortfahren und einige der neuen Cadence-Funktionen genauer betrachten.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

Lassen Sie uns eine neue und komplexere Cadence-Anwendung für Drohnen-Lieferungen erstellen! Dies ist der erste Teil einer mehrteiligen Reihe über Cadence-Drohnen mit einer Einführung in das Problem der Drohnenlieferung, mit dem Cadence-Hauptworkflow für Drohnen und mit einem Anhang zu Drohnenbewegungen.

1. Der Himmel ist voller Drohnen (und Krähen)

(Quelle: Shutterstock)

Meine jüngste Silvesterlektüre war „Termination Shock“ von Neal Stephenson, ein Roman, der in der nahen Zukunft spielt, in der Drohnen allgegenwärtig sind, wobei es kurioserweise auch um dressierte Raubvögel geht, die Drohnen zerstören. Fakten sind häufig seltsamer als Fiktion, denn dies erinnerte mich an den Test eines Drohnen-Lieferdienstes in der Nähe von Canberra, wo ich lebe, der es jüngst wegen eines Krähen-Angriffs in die Nachrichten schaffte. Als ich den Lieferdienst selbst testen wollte, stellte sich leider heraus, dass er in meiner Gegend noch nicht angeboten wird, obwohl es viele perfekte Drohnen-Landeplätze in der Nähe gibt. Nun gut, vorerst musste also die Fiktion genügen.

Dafür beschloss ich aber, für mein nächstes Cadence-Experiment einen Drohnen-Lieferdienst aufzubauen (simuliert natürlich, auch wenn meine Garage mit mehreren Generationen ausrangierter Drohnen vollgestopft ist). Im Experiment soll der örtliche Versuch als Vorbild dienen, bei dem die Kunden bei ausgewählten teilnehmenden Geschäften kleine Artikel (z. B. Medikamente, Kaffee, Lebensmittel, Kleinteile) bestellen können. Es gibt einen zentralen Drohnenstützpunkt (ähnlich, aber viel langweiliger als das Versteck eines klassischen Film-Schurken oder Tracy Island aus der Science-Fiction-Serie „Thunderbirds Are Go!“ oder die Drohnensammelplätze der Bienenvölker), wo sich alle Drohnen befinden, wenn sie nicht gerade eine Bestellung ausliefern.

Drohnensammelplätze von Bienen (Quelle: Shutterstock)

Der Stützpunkt ist der Ort, an dem die Batterien der Drohnen aufgeladen werden. Wenn eine Bestellung versandfertig ist, fliegen die Drohnen vom Stützpunkt zum jeweiligen Geschäft, holen die Bestellung ab und fliegen zum Standort des Kunden, um die Bestellung auszuliefern. Danach fliegen sie zurück zum Stützpunkt, und der Prozess beginnt von vorn. Da die Drohnen autonom sind, kann es viele von ihnen geben. Es gibt allerdings Grenzen hinsichtlich Gewicht der Artikel, Einsatzzeiten, Bereiche, in denen geflogen werden darf oder nicht (z. B. Flugverbotszonen), dazu viele potenzielle Fehlerarten (z. B. Nichterreichen des Zielstandorts, Nichtzustellung der Bestellung, Absturz usw.) und die maximale Reichweite der Drohnenbatterie mit ausreichendem Sicherheitspuffer, um zum Stützpunkt zurückzukehren (insbesondere, wenn sie angreifenden Krähen ausweichen müssen oder vom Kurs abgebracht werden usw.). Die einzigen Faktoren, die ich zunächst berücksichtigen wollte, waren Entfernung und Flugzeit.

2. Anwendung Drohnenlieferung: Cadence Workflow-Design

Der allgemeine Drohnen-Workflow (Quelle: Shutterstock – Zusammenstellung)

Ich wollte bei meiner Demo für Drohnenlieferungen die Dinge einfach, aber trotzdem interessant halten. Daher beschloss ich, alles, was stateful ist und Aktionen oder Aufgaben haben kann, als Cadence Workflow zu implementieren. Es gibt zwei Workflow-Typen, die miteinander interagieren.

Jede Drohne ist als Workflow modelliert, wobei verschiedene Zustände durchlaufen werden: Bereit am Stützpunkt, Warten auf eine Bestellung, Fliegen zur Abholung der Bestellung, Abholen der Bestellung, Fliegen zum Zielstandort, Zustellen der Bestellung und dann Zurückfliegen zum Stützpunkt und Aufladen. Nach jedem Lieferzyklus startet der Drohnen-Workflow eine neue Instanz (mit derselben Workflow-ID, aber einer anderen Ausführungs- oder Run-ID) und ist bereit für die nächste Lieferung.

Außerdem habe ich entschieden, Bestellungen als Cadence Workflow zu modellieren. Der Grund dafür ist, dass auch Bestellungen einen Zustand und Zustandsübergänge von einem Zustand zum nächsten haben (z. B. Annahme der Bestellung, Bereit zur Abholung, Von Drohne abgeholt, Unterwegs, Geliefert, Bestellung abgeschlossen). Außerdem gibt es Aktionen. Der Bestellungs-Workflow ist beispielsweise dafür verantwortlich, zufällige Abhol- und Lieferorte für Bestellungen zu generieren, die sich innerhalb der Drohnenreichweite des Stützpunkts befinden, und anzugeben, wann die Bestellung zur Abholung bereit ist. Wir werden diesen Bestellungs-Workflow im nächsten Blog genauer unter die Lupe nehmen.

Im Folgenden sind die Hauptschritte des Cadence Workflow für Drohnen aufgeführt. Zuerst wird für jede Drohne genau eine Drohnen-Workflow-Instanz erstellt. Der Workflow modelliert den kompletten Liefervorgang einer Bestellung, einschließlich Aufladen der Drohne, und startet dann eine neue Instanz mit einer voll aufgeladenen Drohne, die für die nächste Lieferung bereit ist. Beachten Sie, dass dieser Code nur die @WorkflowMethod (entry point)-Methode für die Drohnen-Workflow-Implementierung darstellt und daher nicht vollständig ist. Der Code verwendet bei einigen Schritten von mir bereitgestellte Hilfsfunktionen, und er benötigt zur Ausführung die Workflow-Schnittstelle und Workflow-Activities. Der gesamte Code ist hier verfügbar.

// missing code

public static class DroneWorkflowImpl implements DroneWorkflow {

// missing code

@Override

    public String startWorkflow(String name) {

    

     droneName = name;

    

     System.out.println("Started Drone workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId());

    

     // STEP 0 - ready

     // Drones always start ready, at the base location

     newStateAndLocation("ready", "base");

        

     // STEP 1 - wait for an Order

     // this step calls a real activity which blocks until an order arrives

     // returns an OrderWorkflow which we used to signal Order WF, also sets OrderID which is just a String

     orderWorkflow = step1_GetOrder();

     newStateAndLocation("gotOrder", "base");




    

        // STEP 2 - generate "flight plan" using the order and delivery locations from the Order

     // The Order WF is responsible for generating random order and delivery locations that are within Drone range

        step2_GenerateFlightPlan();

     newStateAndLocation("flightPlanGenerated", "base");




        // STEP 3 - another real activity - flying to get the order 

        System.out.println("Drone + " + name + " flying to pickup Order");

     newStateAndLocation("flyingToOrder", "betweenBaseAndOrder");

     // Let the Order WF know that the drone is on the way

        orderWorkflow.signalOrder("droneOnWayForPickup");

        // nextLeg is where the Drone movement is calculated, causing the drone to "fly" from planStart to planOrder locations

        // false and null arguments ensure that the Order location isn't updated yet, but charge is reduced

        activities.nextLeg(planStart, planOrder, false, null);

        

        // STEP 4 - arrived at order location, collect order - this takes time and uses charge to

        System.out.println("Drone + " + name + " picking up Order");

        newStateAndLocation("pickingUpOrder", "orderLocation");

        step4_pickUpOrder();

        

        // STEP 5 - flying to deliver the order   

        System.out.println("Drone + " + name + " delivering Order...");

        newStateAndLocation("startedDelivery", "betweenOrderAndDelivery");

        // next GPS location drone flies to

        nextGPSLocation = planDelivery;

        // let Order WF know the delivery has started

        orderWorkflow.signalOrder("outForDelivery");

        orderWorkflow.updateLocation("onWay");

        // drone flies to delivery location, updating drone and order locations and drone charge as it goes

        activities.nextLeg(planOrder, planDelivery, true, orderID);

        

        // STEP 6 - drop order

        System.out.println("Drone + " + name + " dropping Order!");

        newStateAndLocation("droppingOrder", "deliveryLocation");

        step6_dropOrder();

         

        // Step 7 - return to base

        System.out.println("Drone + " + name + " returning to Base");

        newStateAndLocation("returningToBase", "betweenDeliveryAndBase");

        nextGPSLocation = planEnd;

        // fly back to base, update drone location and charge, but not Order location as it's already been delivered.

        activities.nextLeg(planDelivery, planEnd, false, null);

        

        // STEP 8 - back at base

        System.out.println("Drone + " + name + " returned to Base!");

        newStateAndLocation("backAtBase", "base");

 

        // STEP 9 - check order - if successful then Order WF completes

        newStateAndLocation("checkOrder", "base");

        step9_checkOrder();  

        

        // Step 10 - delivery complete

        newStateAndLocation("droneDeliveryCompleted", "base");

              

        // Step 11 - charge

        newStateAndLocation("charging", "base");

        step11_recharge();

        

        // Step 12 - fully recharged

        newStateAndLocation("charged", "base");

        

        System.out.println("Starting new Drone delivery WF with coninueAsnew with same WF ID!");

        

        Workflow.continueAsNew(name);

        

     return "Drone Delivery Workflow " + name + " completed!";

    }

}

3. Anwendungsfall für Cadence: Drohnenlieferung von A nach B

Drohnenlieferung von A nach B (Quelle: Shutterstock)

Mit dem obigen Code und dieser Abbildung scheinen Drohnenlieferungen von einem Standort zum anderen eine einfache Sache zu sein. In der Praxis ist die Sache jedoch etwas komplexer, was mich motiviert hat, bei einigen Workflow-Schritten Cadence-Activity zu verwenden.

Der Standort ist bei Drohnenlieferungen der wichtigste Faktor. Ich habe entschieden, Koordinaten mit Längen- und Breitengrad (als Dezimalwerte) mit einer Auflösung von 1 m zu verwenden. Jede Drohne hat ein „GPS“, mit dem die Position der Drohne verfolgt wird, und die Workflow-Instanz der Drohne hat einen Standort-Zustand. Drohnen starten am Stützpunkt-Standort und kehren (hoffentlich) wieder dorthin zurück. Bestellungen und deren zugehörige Workflow-Instanzen haben außerdem einen Standort-Zustand. Dieser umfasst den Standort der Abholung, den Standort der Zustellung und den eigentlichen Standort (aktuelle Position) der Bestellung, der während des Transports der Bestellung durch die Drohne aktualisiert wird. Dies ist wichtig, da der Drohnen-Lieferdienst und das Geschäft jederzeit wissen möchten, wo sich die Bestellung befindet. Weiterhin können so auch die Kunden verfolgen, wann ihre Lieferung eintrifft, und diese dann von der Drohne in Empfang nehmen (und vor verärgerten Krähen retten).

Hier ist die Karte mit einem Beispiel für eine komplette Drohnenlieferung:

Bevor sich die Piraten auf Schatzsuche begeben, lassen sie sich aus dem Dorf noch eine Mahlzeit liefern! (Quelle: Shutterstock)

Aber wie kommt eine Drohne von einem Standort zum anderen? Ich habe ein einfaches Drohnenmathematik-Package DroneMaths geschrieben, das bei den Berechnungen hilft. Wenn Sie an Details interessiert sind, wie wir die Drohnenbewegungen von einem Standort zum anderen berechnen, werfen Sie einen Blick in den Anhang unten. Die Funktion DroneMaths.nextPosition(start, end, speed, time) berechnet die nächste Position der Drohne unter Verwendung des Startstandorts (start), des Zielstandorts (end) und von Geschwindigkeit (speed) und Flugzeit (time), was dann in der nextLeg()-Activity verwendet wird.

4. Cadence Workflow Activity

Die nextLeg()-Activity ist für die Simulation der Drohnenbewegung in jedem Teilabschnitt des Lieferflugs und für das Stoppen bei Ankunft am Ziel verantwortlich. Die Activity aktualisiert die Drohnenposition und den Ladestand der Drohnenbatterie und optional den Bestellstandort während des Flugs. Die Activity kehrt zurück, wenn die Drohne angekommen ist. Wir verwenden dafür eine Cadence Activity, da diese potenziell über längere Zeit ausgeführt wird, durch Verwendung von DroneMaths rechenintensiv ist und daher in einem vom Drohnen-Workflow separaten Thread ausgeführt werden sollte. Und sie kann potenziell fehlschlagen (wie bei einer echten Drohne, d. h. einer nicht simulierten, wirklich fliegenden Drohne, die mit der realen Welt interagieren muss, was sie zu einem guten Beispiel für eine nicht deterministische Activity macht). In diesem Fall soll sichergestellt sein, dass die Activity neu gestartet und die Drohnenbewegung von dem Punkt aus fortgesetzt wird, wo die Unterbrechung stattfand. Wir werden diesen Aspekt aber erst im nächsten Blog berücksichtigen, wenn wir uns eingehender mit der Behandlung von Ausnahmen befassen.

public static class DroneActivitiesImpl implements DroneActivities

{




// “Fly” from start to end location, return when drone arrives 

// Update Drone location and charge, and Order Location only if required.

public void nextLeg(LatLon start, LatLon end, boolean updateOrderLocation, String orderID) {         

         WorkflowExecution execution = Activity.getWorkflowExecution();

         String id = execution.getWorkflowId();  

DroneWorkflow droneWF = workflowClient.newWorkflowStub(DroneWorkflow.class, id);

 

OrderWorkflow orderWF = workflowClient.newWorkflowStub(OrderWorkflow.class, orderID);  

LatLon here = start;

       

         while (true)

      {

         try {

Thread.sleep((int)(moveTime * 1000 * timeScale));

} catch (InterruptedException e) { e.printStackTrace();

return;

}

         

      LatLon next = DroneMaths.nextPosition(here, end, droneSpeed, moveTime);

      here = next;

      System.out.println("Drone flew to new location = " + here.toString());

      double distance = DroneMaths.distance(here, end);

      System.out.println("Distance to destination = " + distance + " km");

      // update drone location and charge         droneWF.updateGPSLocation(here);

      droneWF.updateCharge(moveTime);

      // only update Order location if drone is transporting it

      if (updateOrderLocation)

      orderWF.updateGPSLocation(here);

     

      // check if we have arrived within 1m

      if (end.sameLocation(here))

    {

System.out.println("Drone arrived at destination.");

      return;

      }

      }

         }

}

Sie werden bemerken, dass ich im Activity-Code den allgemeinen Thread.sleep()-Aufruf verwendet habe. Das ist in Ordnung, da eine Cadence Activity (im Gegensatz zu einem Cadence Workflow) beliebigen Code verwenden kann. Allerdings habe ich bei den Activities eine Einschränkung entdeckt. Es hat sich herausgestellt, dass die Argumente und Rückgabewerte einer Activity-Methode mit dem bereitgestellten DataConverter in ein Byte-Array serialisiert werden müssen (die Standardimplementierung verwendet einen JSON-Serialisierer). Daher musste ich an die Methode einen „String OrderID“-Wert übergeben und damit eine neue OrderWorkflow-Instanz erstellen, anstatt nur einen OrderWorkflow zu übergeben (der nicht serialisierbar ist).

Abschließend gibt es noch einen DroneWorkflow-Schritt step1_GetOrder(), der entscheidend für den Fortschritt des Drohnen-Workflows ist. Dieser Schritt wartet, bis eine Bestellung zur Lieferung bereit ist, und legt dann die Bestell-ID und eine OrderWorkflow-Instanz im Hauptworkflow fest. Diese Hilfsmethode ist eigentlich ein Wrapper für eine weitere Activity, die blockiert, bis eine Bestellung zur Abholung durch eine Drohne bereit ist, und die garantiert, dass jede abholbereite Bestellung von genau einer Drohne abgeholt wird. Wie funktioniert das? Im Grunde ist dies ein weiteres Beispiel für ein Cadence+Kafka-Integrationsmuster, das wir im nächsten Blog zusammen mit einigen weiteren Kafka+Cadence-Mustern untersuchen werden (einschließlich Starten eines Cadence Workflow mit Kafka, das auch beim Bestellungs-Workflow verwendet wird). Im nächsten Blog werden wir außerdem die gesamte Lösung analysieren, einschließlich des Bestellungs-Workflows, sowie weitere Cadence-Funktionen wie Abfragen, Neuversuche, Heartbeats, Als neu fortfahren und Nebeneffekte.

P.S. Hat der Drohnen-Lieferdienst zufällig von meinem Projekt gehört? Finde gerade eine Broschüre in meinem Briefkasten (ja, ziemlich „old school“, aber vielleicht per Drohne zugestellt?), die den Start des Lieferdienstes in meiner Gegend ankündigt – immer her mit den Drohen-gelieferten Sachen! (Ich habe auch festgestellt, dass deren Drohnen viel schneller sind als meine – ich werden die Durchschnittsgeschwindigkeit auf 100 km/h erhöhen müssen, um mithalten zu können).

5. Anhang – Drohnenbewegung: Standort, Entfernung, Richtung, Geschwindigkeit und Ladung!

Wie bewegt sich eine Drohne von einem Standort zum anderen? Ich habe ein einfaches Drohnenmathematik-Package DroneMaths geschrieben, das bei den Berechnungen hilft (die meisten Formeln sind hier nachzulesen). Zuerst benötigen wir eine Funktion zur Berechnung der Entfernung (in km) zwischen zwei Standorten anhand der dezimalen Längen-/Breitenkoordinaten (dabei wird auch die Erdkrümmung berücksichtigt, wobei dies für die kurzen Entfernungen, mit denen wir es hier zu tun haben, vernachlässigt werden kann – außer natürlich, die Drohnen fliegen sehr hoch). So wird sichergestellt, dass sich der gesamte Liefervorgang (einschließlich Rückkehr) im Bereich der maximalen Reichweite der Drohne abspielt.

Aber wie erfolgt die Navigation von einem Standort zum anderen? Wie sich herausstellt, handelt es sich hier um eine ganz traditionelle Navigation, wie sie für Schiffe verwendet wird. Dafür muss die Richtung (Kurs) von einem Standort zum anderen in Grad bekannt sein (0 Grad nach Norden, 90 Grad nach Osten, 180 Grad nach Süden und 270 Grad nach Westen).

Um Schiffe rund um den Erdball zu navigieren, braucht es nur Karte, Kompass und Zirkel. (Quelle: Shutterstock)

Um schließlich den nächsten „Wegpunkt“ zu ermitteln und der Drohne zu ermöglichen, auf kürzestem Weg von einem Standort zum anderen zu gelangen (also per Luftlinie, was voraussetzt, dass es auf dem Weg keine Hindernisse oder Flugverbotszonen gibt – und auch keine Krähen), verwenden wir die Funktion nextPosition() zur Berechnung der nächsten Drohnenposition. Damit wird die Richtung vom aktuellen Standort zum vorgesehenen Zielstandort ermittelt (Bestellstandort, Lieferstandort oder Stützpunkt-Standort, je nach aktuellem Zustand der Drohne), und bei gegebener Geschwindigkeit (wir nehmen an, dass sich die Drohne über die gesamte Strecke mit einer Durchschnittsgeschwindigkeit von 20 km/h bewegt) wird die im nächsten Zeitintervall (welches konfigurierbar ist und für eine höhere Geschwindigkeit als in Echtzeit-Simulationen skaliert werden kann) zurückzulegende Entfernung berechnet. Danach wird eine weitere Funktion aufgerufen, um den Standort anhand des aktuellen Standorts, der Reichweite und der Richtung zu berechnen.

Wenn wir das Ziel bereits erreicht haben, kann der Flug gestoppt und die nächste Aktion ausgeführt werden (z. B. Bestellung aufnehmen, Bestellung ablegen, zum Stützpunkt absenken). Momentan gehen wir davon aus, dass jede Flugroute erfolgreich ist, aber aus Spaß können auch verschiedene Ausnahmen wie Wind, Krähen, Zusammenstöße usw. mit der entsprechende Ausnahmebehandlung eingeführt werden (wieder je nach Drohnenzustand – wenn z. B. die Drohne aus irgendeinem Grund eine Lieferung nicht aufnehmen kann, muss die Bestellung neu geplant werden, vielleicht unter Verwendung einer Prioritätswarteschlange, sodass die nächste verfügbare Drohne erst der nicht gelieferten Bestellung zugeteilt wird, bevor weitere Bestellungen akzeptiert werden).

Die Activity nextLeg() (siehe oben) ist für die Berechnung der verschiedenen Drohnenbewegungen in jedem Abschnitt der Lieferung verantwortlich, aber sendet vor allem die aktualisierten Positionsdaten der Drohne und der zugehörigen Bestellung unter Verwendung von Cadence-Signalen, deren Funktionsweise im vorherigen Blog erklärt wurde. Auch der Drohnen-Workflow selbst signalisiert dem Bestellungs-Workflow Zustandsänderungen, z. B. mit orderWorkflow.signalOrder("droneHasOrder").

Der nächste Drohnenstandort wird anhand von aktuellem Standort, Richtung, Geschwindigkeit und Entfernung berechnet (Quelle: Shutterstock)

Drohnen werden elektrisch betrieben, sodass beim Fliegen, Schweben usw. Strom verbraucht wird. Für jede inkrementelle Entfernung, die die Drohne fliegt, reduzieren wir die Batterieladung mithilfe der Hilfsfunktion updateCharge(time), die den Ladungsverbrauch auf Basis der bereitgestellten Flugzeit berechnet.

Und natürlich muss die Batterieladung ausreichen, bis die Drohne zum Stützpunkt zurückgekehrt ist. (Quelle: Shutterstock)

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

In „Workflows mit Cadence optimieren“ haben wir Cadence, eine neue skalierbare, auf Entwickler fokussierte Open-Source-Workflow-Engine, vorgestellt. Cadence ist eine großartige Lösung für ein äußerst fehlertolerantes, zustandsabhängiges Workflow-Management, auch bekannt als „Orchestrierung“ (in Anlehnung an die Idee, dass ein Dirigent ein Orchester leitet). Eine andere beliebte Architektur für große nachrichtenbasierte dezentrale Systeme verwendet jedoch einen losgelösten Ansatz, der als „Choreografie“ bekannt ist (in Anlehnung an die Idee, dass Tänzer mit ihren unmittelbaren Nachbarn interagieren). Apache Kafka ist ein gängiges Beispiel und erlaubt zahlreichen Microservices, skalierbar und mit geringer Latenz miteinander zu kommunizieren, ohne dass eine zentrale Zustandsverwaltung erforderlich ist.

In vielen Unternehmen ist es allerdings üblich, schon jetzt zahlreiche Kafka-Microservices zu nutzen. Darüber hinaus verwenden typische moderne Unternehmenssysteme mehrere Technologien, sodass man im Endeffekt beide Architekturen kombinieren und Kafka-Microservices mit Cadence-Workflows integrieren muss. Anstatt nur ein Orchester oder nur Tänzer zu engagieren, müssen Sie ein Ballett aufführen, das beides kombiniert.

(Quelle: Shutterstock)

Es gibt eine ganze Reihe von Anwendungsfälle für Kafka in Kombination mit Cadence, in diesem Blog konzentrieren wir uns jedoch auf die Vorteile bei der Wiederverwendung von Kafka-Consumer-basierten Microservices aus Cadence-Workflows. Wie kann man also eine Nachricht von Cadence an Kafka senden, um eine Activity in Kafka zu starten, und wie wartet man asynchron auf eine Antwort und empfängt diese schließlich? In meiner Blog-Serie Anomalia Machina habe ich beispielsweise darüber berichtet, wie ich ein System zur Erkennung von Anomalien als Kafka-Consumer implementiert habe.

Was aber, wenn wir eine Prüfung auf Abweichungen im Rahmen eines Cadence-Workflows durchführen wollen?

Um eine Nachricht an Kafka zu senden, muss lediglich ein Kafka-Producer mit den richtigen Metadaten im Rahmen einer Cadence Activity Method verwendet werden, um sicherzustellen, dass eine Antwort gesendet/empfangen werden kann. Es gibt dabei verschiedene Wege, auf eine Antwort zu warten und sie zu erhalten. Das schauen wir uns jetzt mal genauer an.

1. Eine Nachricht von Cadence an Kafka senden

(Quelle: Shutterstock)

Glücklicherweise ist es ein wenig einfacher, eine Nachricht an Kafka zu senden, als Flaschenpost zu versenden, in der Hoffnung, dass die enthaltene Nachricht auch empfangen wird.

Bereiten Sie also zunächst Ihre „Flasche“ (den Kafka-Producer) vor, ich habe beispielsweise diese Eigenschaftsdatei verwendet:

Properties kafkaProps = new Properties();




        try (FileReader fileReader = new FileReader("producer.properties")) {

            kafkaProps.load(fileReader);

        } catch (IOException e) {

            e.printStackTrace();

        }

Die Datei producer.properties für einen Instaclustr Managed Kafka-Dienst wird folgendermaßen konfiguriert (die clusterspezifischen Details müssen über die Instaclustr-Konsole oder die Verwaltungs-API eingegeben werden):

bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=org.apache.kafka.common.serialization.StringSerializer

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \

    username="kafakUser" \

    password="kafkaPassword";

Anschließend wird eine Cadence Activity Method festgelegt, die eine Nachricht an Kafka sendet:

public interface ExampleActivities {

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

     String sendMessageToKafka(String msg);

}




public static class ExampleActivitiesImpl implements ExampleActivities {

public String sendMessageToKafka(String msg) {

// The id is the workflow id

         String id = Activity.getWorkflowExecution().getWorkflowId();

        

         // Record contains: topic, key (null), value

         ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", msg);

                  

// Add the workflow id to the Record Header

         producerRecord.headers().add(new RecordHeader("Cadence_WFID", id.getBytes()));




             try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

                 producer.send(producerRecord);

                 producer.flush();

             } catch (Exception e) {

                 e.printStackTrace();

             }




return "done";

  }

}

Im Kafka-Datensatz wird das Zielthema, ein Nullschlüssel und der Nachrichtenwert angegeben. Mit dem Kafka-Header wird die Workflow-ID an Kafka übergeben, damit das System weiß, woher die Nachricht stammt. Ein Header enthält ein Key-Value-Paar, und jeder Kafka-Datensatz kann mehrere Header haben. Die Verwendung von Kafka-Headern zur Unterstützung von OpenTracing ist bereits bekannt.

Warum werden Header für die Workflow-ID verwendet und nicht etwa der Datensatzschlüssel?

Nun, wenn die vorhandenen Kafka-Microservices wiederverwendet werden sollen, dürfen sich die Änderungen auf der Kafka-Seite nur minimal auswirken. Bestehende Kafka-Consumers gehen von dem Schlüssel und dem Wert des Datensatzes aus, aber die Idee hinter einem Kafka-Header ist, dass man optionale Metadaten hinzufügt, wodurch bestehende Implementierungen nicht unterbrochen, sondern leicht erweitert werden können, sodass die Metadaten verstanden werden und eine entsprechende Reaktion erfolgt.

Wenn es Ihnen nur darum geht, eine Nachricht an Kafka zu senden und dann mit dem übrigen Workflow fortzufahren („Fire-and-Forget“), dann ist Ihre Arbeit hiermit getan. Eigentlich brauchen Sie keine Workflow-ID zu senden, wenn Sie keine Antwort erwarten (es sei denn, es gibt einen anderen Grund, warum der Kafka-Consumer wissen muss, dass die Nachricht von Cadence stammt). Der Workflow (in BPMN-ähnlicher Darstellung) sähe dann so aus (mit einer optionalen zweiten Aufgabe usw.).

Wir gehen allerdings davon aus, dass es in unserem Anwendungsfall eine Verarbeitung auf der Kafka-Seite als Reaktion auf den Empfang der Nachricht gibt. Das Ergebnis muss wieder vom Workflow empfangen werden, bevor weitere Activities durchgeführt werden können. Wie lässt sich die Schleife beenden?

2. Einführung von Cadence-Signalen

Signallampen werden zur Kommunikation zwischen Schiffen auf See verwendet. https://commons.wikimedia.org/wiki/File:Seaman_send_Morse_code_signals.jpg

Für eine Antwort von Kafka brauchen wir etwas Besseres als Flaschenpost. Cadence-Signale basieren auf UNIX-ähnlichen IPC-Signalen (Interprozesskommunikation), die es einem Prozess oder einem bestimmten Thread erlauben, Nachrichten zu senden, und ihn über ein Ereignis zu informieren (z.B. „kill -9 <pid>“!). In einem kürzlich erschienenen Blog über Apache ZooKeeperTM und Curator haben wir auch über die Verwendung von Signalen (Semaphoren, Abschnitt 3.3) berichtet.

Aus der Dokumentation geht hervor, dass Cadence-Signale:

„… einen vollständig asynchronen und dauerhaften Mechanismus zur Bereitstellung von Daten für einen laufenden Workflow bieten. Wenn ein Signal für einen laufenden Workflow empfangen wird, hält Cadence das Ereignis und die Nutzdaten in der Workflow-Historie fest. Im Workflow kann das Signal später jederzeit verarbeitet werden, ohne dass die Informationen verloren gehen. Außerdem kann die Ausführung des Workflows durch Blockieren eines Signalkanals gestoppt werden.”

In meinem ersten Cadence-Blog habe ich darüber berichtet, dass es bei jedem Workflow eine Schnittstelle und eine Implementierung mit mehreren möglichen Methoden gibt, aber nur eine Methode mit der Kennzeichnung @WorkflowMethod, die anzeigt, bei welcher Methode es sich um den Einstiegspunkt für den Workflow handelt. Es gibt auch andere Methoden, die mit @SignalMethod gekennzeichnet sind. Zum Beispiel:

public interface ExampleWorkflow {

        @WorkflowMethod(executionStartToCloseTimeoutSeconds = 120, taskList = activityName)

        String startWorkflow(String name);

        @SignalMethod

        void signalKafkaReply(String name);

    }

Und die Umsetzung dieser beiden Methoden:

public static class ExampleWorkflowImpl implements ExampleWorkflow { String message = "";

     private ExampleActivities activities = null;

    

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

     }

    

     @Override

     public String startWorkflow(String msg) {

         String r1 = activities.sendMessageToKafka(msg);

         Workflow.await(() -> message != "");

         System.out.println("workflow got signal = " + message);

           return message;

         }

     }

        

     @Override

     public void signalKafkaReply(String msg) {

         message = msg;

     } 

}

 

Es können ganz unterschiedliche Signalmethoden verwendet werden, aber für dieses Beispiel verwende ich aus praktischen Gründen nur eine Signalmethode. Jedes Mal, wenn ein ExampleWorkflow.signalKafkaReply-Signal an eine Workflow-Instanz gesendet wird, kommt es zum Aufruf der entsprechenden Signalmethode. Signalmethoden geben nie einen Wert zurück, sondern bewirken etwas im Hintergrund: In diesem Fall wird die Nachrichtenvariable mit dem Wert des Signalarguments belegt.

Abschließend ist noch anzumerken, dass die startWorkflow-Methode implementiert wurde, die nun die sendMessageToKafka-Activity Methode abruft. Danach wird sie sofort durch die Funktion Workflow.await() blockiert, und zwar so lange, bis die Funktion, die sie als Parameter erhält, auf True gesetzt wird, d. h. bis dieser Workflow ein Signal mit einem Nachrichtenwert erhält, der ungleich null ist. Das Design von Cadence lässt eine Auswertung der Bedingung nur bei Statusänderungen des Workflows zu (d. h. es findet kein Abruf statt).

Wie lassen sich also Cadence-Signale für die Integration mit Kafka nutzen?

2. Cadence-Signale mit Kafka integrieren: Erster Ansatz

Im Folgenden wird der Workflow unter Verwendung von Signalen beschrieben:

Wir erfahren, wie das gesamte Muster funktioniert, also das Senden einer Nachricht an Kafka, das Warten auf eine Antwort und die Meldung der richtigen Workflow-Instanz von Kafka. Die ersten zwei Teile haben wir bereits behandelt. Der letzte Teil der Aufgabe ist die Frage, wie ein Kafka-Consumer eine bestimmte Workflow-Instanz melden kann. Im zuvor genannten Kafka-Producer haben wir die Workflow-ID im Datensatz-Header gesendet. So kann der Kafka-Consumer diese Information abrufen und

  1. erkennen, dass der Datensatz von Cadence stammt, und
  2. die Antwort an den richtigen Workflow zurückschicken.

Der Kafka-Consumer muss so konfiguriert werden, dass er sich zum einen mit dem Kafka-Cluster verbindet, an den der Datensatz gesendet wurde, und zum anderen mit dem Cadence-Cluster, der diesen Workflow verwaltet (was zu KafkaConsumer– und WorkflowClient-Objekten führt). Die Hauptabrufschleife des Kafka-Consumer sieht so aus:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {

            consumer.subscribe(Collections.singleton("topic1"));




            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {

                    String id = "";

                    // If the header has Cadence_WFID key then we send the response to Cadence

                    for (Header header : record.headers()) {  

                        if (header.key().matches("Cadence_WFID"))

                         id = new String(header.value());

                    }

                    if (id != "")

                    {

                       ExampleWorkflow workflowById = workflowClient.newWorkflowStub(ExampleWorkflow.class, id);

                        workflowById.signalKafkaReply("Kafka consumer received value = " + record.value());                        

                    }

                    // else don't send signal! But do whatever is normal

                }

            }

Es handelt sich hierbei nur um ein Anschauungsbeispiel, d. h. der Kafka-Consumer verarbeitet die Anfrage nicht wirklich, und die Antwort ist nur eine Zeichenkette, die den Anfragewert enthält. Wird im Header „Cadence_WFID“ gefunden, wird zunächst mit WorkflowClient.newWorkflowStub() ein neuer Stub für den Workflow mit ExampleWorkflow.class und der Workflow-ID erstellt. Anschließend wird das Signal durch Aufruf der Signalmethode an diesen Stub gesendet. Andernfalls erfolgt die normale Verarbeitung und Lieferung der Ergebnisse. Wenn der Workflow das Signal erhalten hat, wird die Blockierung aufgehoben, und er setzt seine Arbeit fort, indem er die Antwort verwendet oder gegebenenfalls weitere Activities durchführt.

In diesem Beispiel wird davon ausgegangen, dass für jede an Kafka gesendete Nachricht nur ein Signal als Antwort gesendet wird. In Kafka können auch Consumer-Gruppen angelegt werden, sodass theoretisch mehrere Signale gesendet werden könnten. In diesem Fall spielt das jetzt keine Rolle, da es nur auf das erste Signal ankommt. Bei anderen Beispielen müssen eventuell mehrere Signale richtig verarbeitet werden.

Mehr Infos zu Cadence-Signalen gibt es hier und hier.

3. Abschluss von Activities in Cadence: Zweiter Ansatz

Abschlussarbeiten auf der Sydney Harbour Bridge („den Bogen spannen“). https://commons.wikimedia.org/wiki/File:SLNSW_44281_Sydney_ferry_Kubu_passes_near_the_Harbour_Bridge_as_the_lower_chord_is_ready_for_joining.jpg circa 1930

Der Abschluss von asynchronen Activities in Cadence (Cadence Asynchronous Activity Completion) ist ein alternativer Ansatz zur Lösung dieses Problems. Mehr Infos zum Abschluss von Activities in Cadence gibt es hier, hier und hier. Die Dokumentation zeigt, dass es sich um eine perfekte Ergänzung handelt:

Manchmal geht der Lebenszyklus einer Activity über einen synchronen Prozessaufruf hinaus. So kann zum Beispiel eine Anfrage in eine Warteschlange gestellt werden und zu einem späteren Zeitpunkt kommt eine Antwort und wird von einem anderen Worker-Prozess abgeholt. Die gesamte Anfrage-Antwort-Interaktion lässt sich als eine einzige Cadence-Activity modellieren.

Wie unterscheidet sich dieser Ansatz von dem Signalansatz? Der Hauptunterschied liegt in der Blockierung, die nun (konzeptionell) in der Activity-Method und nicht mehr im Workflow selbst stattfindet. Außerdem ist der Aufruf zum Abschluss mit der Activity und nicht mit dem Workflow verbunden. Es handelt sich jedoch immer noch um eine „Punkt-zu-Punkt“-Kommunikation, die über Prozessgrenzen hinweg funktioniert Dieser Workflow sieht dann folgendermaßen aus:

Die Implementierung des Haupt-Workflows ist jetzt ganz einfach, da weder Wartezeiten noch eine Signalmethode erforderlich sind:

public static class ExampleWorkflowImpl implements ExampleWorkflow {      




private ExampleActivities activities = null;

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

        } 

 

@Override

public String startWorkflow2(String msg) {

String r1 = activities.sendMessageToKafka2(msg);

      return r1;

} 

}

Die Activity-Method ist jedoch etwas komplizierter und sieht wie folgt aus:

public String sendMessageToKafka2(String msg) {

ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", name);  

// The task token is now used to correlate reply. 

      byte[] taskToken = Activity.getTaskToken();          

      producerRecord.headers().add(new RecordHeader("Cadence_TASKID", taskToken));




      try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

                 producer.send(producerRecord);

                 producer.flush();

             } catch (Exception e) {

                 e.printStackTrace();

      }

// Don’t complete - wait for completion call

      Activity.doNotCompleteOnReturn();

// the return value is replaced by the completion value

      return "ignored";

}

Alternativ zur Workflow-ID (wie zuvor bei den Signalen beschrieben) wird nun der Task-Token ermittelt und als Wert des Schlüssels „Cadence_TASKID“ in den Kafka-Header eingefügt. Die Nachricht wird wie zuvor mit einem Kafka-Producer an das Kafka-Topic gesendet. Die Activity zeigt dann jedoch mit der Methode Activity.doNotCompleteOnReturn() an, dass sie bei Rückgabe nicht „abgeschlossen“ ist. Der Rückgabewert wird „ignoriert“ (es kann sich dabei um einen beliebigen Wert handeln). Doch was bedeutet das? Grundsätzlich wird jeder Wert, der von der Activity-Method zurückgegeben wird, verworfen und später durch den Wert ersetzt, der durch den Aufruf zum Abschluss bereitgestellt wird.

Auch beim Kafka-Consumer gibt es einige Änderungen, und zwar:

ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient();




try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {

            consumer.subscribe(Collections.singleton("topic1"));




            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {

                    byte[] task = null;

                    // If the header has key = Cadence_TASKID then send response to Cadence task using completion

                    for (Header header : record.headers()) {

                       if (header.key().matches("Cadence_TASKID"))

                        task = Arrays.copyOf(header.value(), header.value().length);

                    }

                    if (task != null)

                     completionClient.complete(task, "Kafka consumer received value = " + record.value());

                    // else process as normal

            }

        }

Zunächst wird ein ActivityCompletionClient unter Verwendung der WorkflowClient-Instanz erstellt (die mit den richtigen Cadence-Cluster-Einstellungen angelegt wurde). Wird in Kafka der Schlüssel „Cadence_TASKID“ im Datensatz-Header für ein Ereignis gefunden, wird die Abschlussmethode mit dem Task-ID-Wert und einem Response-Wert aufgerufen. Dieser Response-Wert wird von der Activity-Method zurückgegeben, die blockiert ist und auf den Abschluss wartet.

Im Kafka-Consumer kann ein „Handler“ für den Signalansatz und den Abschlussansatz vorhanden sein. Wenn keiner der beiden Ansätze im Header vorhanden ist, findet eine normale Verarbeitung statt.

Das Thema „Abschluss“ war mir immer noch ein Rätsel, also fragte ich unsere Entwickler nach weiteren Details (danke Kuangda, Matthew und Tanvir). Hier das Wichtigste in Kürze:

Der Abschluss (Completion) ist ein praktisches paralleles Programmiermuster, um auf einen externen Callback zu warten, der das Ergebnis liefert (Push), anstatt das Ergebnis abzufragen (Pull). Der ursprüngliche Rückgabewert wird ignoriert, da er eigentlich nur einen speziellen Fehler zurückgibt (mithilfe eines Golang-Fehlerbehandlungsmusters), der signalisiert, dass das Resultat, das vom Completion-Callback geliefert wird, noch aussteht.

In der Cadence-go-Client-Dokumentation wird dies näher ausgeführt. Bei Java gibt es einen ähnlichen Mechanismus, der Futures und CompletableFutures verwendet, und einen weiteren Blog.

4. Kafka und Cadence: Fazit

(Quelle: Shutterstock)

Wir sind jetzt am Ende dieses Blogs angelangt und haben die Ansätze „Signale“ und „Abschluss von Activities“ kennengelernt – zwei Lösungen für die Integration von Kafka-Microservices mit Cadence-Workflows, mit denen sich die zwei Welten der Choreografie und der Orchestrierung perfekt kombinieren lassen. Beide Lösungen funktionieren einwandfrei. Ich hoffe, die „Aufführung“ hat Ihnen gefallen.

Der Unterschied besteht im Wesentlichen darin, dass „Signale“ auf der Ebene von Workflows arbeiten, und „Abschlüsse von Activities“ dafür detaillierter angelegt sind und auf der Ebene von Aufgaben/Activities agieren. Zur Korrelation komplexerer Workflows mit mehreren gleichzeitigen Aufgaben/Activities, die mit Kafka kommunizieren, kann dies durchaus ein Vorteil sein.

Beim Ansatz „Abschluss“ regelt das Activity Timeout die Zeitüberschreitung für den gesamten Nachrichtenumlauf zu und von Kafka. Beim Ansatz „Signal“ hingegen bieten die separate Zeitüberschreitung für die Activity (Senden der Nachricht an Kafka) und das anschließende Warten auf die Antwort von Kafka mehr Flexibilität.

Abschließend sei noch darauf hingewiesen, dass „Signale“ und „Abschluss von Aktivtäten“ einen Remote-Aufruf darstellen und eine Verzögerung beim Kafka-Consumer verursachen. Der Ansatz „Signale“ erfordert 2 Remote-Aufrufe, während der Ansatz „Abschluss“ nur 1 Remote-Aufruf benötigt und daher eventuell etwas schneller ist. Es kann auch sein, dass die Anzahl der Kafka-Topic-Partitionen und -Consumer erhöht werden muss, um einen ausreichenden Durchsatz zu erzielen.

Der Code ist hier verfügbar.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

1. Was ist Cadence?

Cadence Logo

Beim Radfahren ist Kadenz (engl. Cadence) ein anderes Wort für Trittfrequenz, also die Drehzahl, mit der die Tretkurbel betätigt wird. Das Fahren mit hoher Kadenz wird Spinning genannt. Beim Fahren mit niedriger Kadenz, dem sogenannten Grinding oder Mashing, ist man langsamer. Und es ist schlecht für die Kniegelenke.

Die rasante Zunahme von Big-Data-Anwendungsfällen in den letzten zehn Jahren wurde durch beliebte, hochgradig skalierbare Open-Source-Technologien wie Apache Cassandra® für Speicherung, Apache Kafka® für Streaming und OpenSearch® für Suchen noch beschleunigt. Nun begrüßen wir ein neues Mannschaftsmitglied: Cadence zur Workflow-Orchestrierung.

Mein Mitbewohner ist mit seinem Cello immer auf dem Fahrrad zur Musikschule gefahren. Das sah ungefähr so wie auf dem Foto aus. Aber Musik und Fahrradfahren haben noch etwas anderes gemeinsam: Bei beidem arbeitet man mit Kadenzen!

(Quelle: Adobe Stock)

In der Musik ist eine Kadenz eine musikalische Phrase, die den Abschluss eines Stücks oder eines Abschnitts markiert. Beim Radfahren zeigt die Kadenz an, wie hoch die Trittfrequenz des Radfahrers ist, was sich wiederum auf die Effizienz und das Tempo der Fahrt auswirkt.

Als Workflow-Orchestrierung wird der Prozess bezeichnet, mit dem Reihen von Aufgaben festgelegt und ausgeführt werden, sowie die Reihenfolge, in der sie ausgeführt werden. Einige Aufgaben müssen vielleicht nacheinander ausgeführt werden, andere gleichzeitig. Nach diesem Prinzip funktionieren auch Orchesterpartituren, aus denen Dirigent, Musiker und Chor ablesen können, welche Noten wann zu spielen bzw. zu singen sind.

Von Max Reger—IMSLP, lizenzfrei, (https://commons.wikimedia.org/w/index.php?curid=51020706)

In der üblichen Workflow-Terminologie (z. B. der von BPMN) besteht ein Workflow aus Start- und Endereignissen, Activities (Atomic Task oder Composite Task), Gateways (Verzweigungen) und Sequenzen bzw. Flüssen – die Reihenfolge von Activities in einem Workflow.

In der Informatik habe ich einige der früheren Versuche erlebt, Workflows zu implementieren und zu modellieren, u. a. Enterprise Java Beans (Stateful Session Beans wurden für Workflows verwendet), ebXML, BPEL und die Modellierung und Evaluierung von Workflows mithilfe von ereignisorientierter Simulation. In der traditionellen Workflow-Modellierung wird die Ausführungssemantik mit Hilfe von semi- formalen Spezifikation, Endlichen Zustandsautomaten (EA), Markov-Modellen, ereignisorientierter Simulation und Petri-Netzen, genutzt, um Workflows festzulegen, zu visualisieren und zu analysieren.

Die Open-Source-Lösung Cadence wurde von Uber auf der Grundlage von AWS SWF (Simple Workflow Service) entwickelt. Sie nutzt Apache Cassandra (und andere Datenbanken) für Persistenz und ist auf eine hohe Fehlertoleranz und Skalierbarkeit ausgelegt. Das vielleicht überraschendste Merkmal von Cadence im Vergleich zu anderen Workflow-Engine-Ansätzen, die ich bereits kennengelernt habe, ist der Fokus auf die Unterstützung der Entwickler, Workflows hauptsächlich in Java oder Go zu schreiben (weitere Sprachen sind verfügbar). Eine spezielle Visualisierungsnotation oder ein weiteres Tool zum Festlegen des Workflow wird daher nicht benötigt – die Semantik besteht einfach nur aus Code. Mit dem Cadence-Web-Client können Workflows jedoch bei der Ausführung visualisiert werden.

(Quelle: Shutterstock)

Zum Fahrradfahren braucht man eine Menge Ausrüstung: beispielsweise ein Fahrrad (logisch…), einen Helm, Schuhe, Wasserflasche, Brille, Handschuhe, Lampe, Pumpe, Shirt und Hose aus Lycra (optional) usw. Für den erfolgreichen Einsatz von Cadence werden ebenfalls viele Elemente benötigt, unter anderem Workflows, Activities, Domains, Workflow-Clients und Worker. Sehen wir uns diese Elemente nacheinander mal genauer an.

2. Cadence – Workflows

Bei Workflows in Fabrikanlagen muss die Reihenfolge der Aufgaben eindeutig festgelegt werden. (Quelle: Shutterstock)

Beginnen wir mit dem Grundkonzept von Cadence: Workflows. Ich werde den Cadence-Java-Client benutzen, und würde Ihnen empfehlen diesen herunterzuladen und entsprechend zu konfigurieren, damit Sie ihn in der von Ihnen gewählten IDE kompilieren können. (Ich nutze Eclipse und Maven. Zur Vereinfachung habe ich die Importe weggelassen). Es gibt einige Beispiele für Java-Clients, die mich inspiriert haben. Zunächst benötigen wir eine Workflow-Interface und -Implementierung.

Workflow-Interface:

static final String activityName = "ExampleActivity";

    public interface ExampleWorkflow {

        @WorkflowMethod(executionStartToCloseTimeoutSeconds = 120, taskList = activityName)

        void startWorkflow(String name);

    }

Workflow-Implementierung:

public static class ExampleWorkflowImpl implements ExampleWorkflow {

   

     private ExampleActivities activities = null;

    

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

        }

    

        @Override

        public void startWorkflow(String name) {

         System.out.println("Started workflow! ID=" + Workflow.getWorkflowInfo().getWorkflowId());

        

         String r1 = activities.task1(name);

           String r2 = activities.task2(r1);

            

         System.out.println("Workflow result = " + r2 + " ID=" + Workflow.getWorkflowInfo().getWorkflowId());

        }

       

    }

In Cadence gibt es immer genau eine Methode mit der @WorkflowMethod-Annotation, die als Startereignis für den Workflow fungiert und den Sequenzfluss und Activities enthält. Ihr Aufruf startet eine stateful Workflow-Instanz, die schließlich endet.

In der Implementierung ruft die Methode startWorkflow() zwei weitere Methoden nacheinander auf (task1, task2), wodurch wir eine Workflow-Logik (einfach sequenziell) und 2 Activities erhalten.

Jede ausgeführte Workflow-Instanz verfügt über eine eindeutige ID, die wir im Beispiel oben verwendet haben. Sie brauchen eine Workflow-ID, da Workflows über mehrere Instanzen verfügen können.

Hier ist es aber noch komplexer. Tatsächlich funktioniert Cadence nicht nur nach dem POJI/POJO-Modell. Es ist eine Plattform, mit der sich stateful Workflows skalierbar und zuverlässig ausführen lassen.

Sie bemerken sicher die Elemente timeout, activities und taskList im vorstehenden Beispiel. Timeouts sind notwendig, weil jeder Workflow, ganz wie ein Musikstück oder eine Fahrradtour, irgendwann enden muss.

3. Cadence – Activity

Einige Leute veranstalten die sonderbarsten Sachen mit ihrem Fahrrad. Das hier wird wahrscheinlich schiefgehen. (Quelle: Shutterstock)

Activities sind ein zentrales Element von Cadence. Workflows sind stateful und fehlertolerant. In einer verteilten Microservices-Architektur möchten sie aber schließlich Remote-APIs aufrufen, um Aufgaben auszuführen. Remote-API-Aufrufe können jedoch fehlerhaft sein und nicht fehlertolerant gestaltet werden, da sie außerhalb des Cadence-Workflows liegen. Cadence nutzt sie, damit jeder Code, einschließlich Remote-Aufrufen, in eine Cadence-Activity eingebunden werden kann, unter dem Vorbehalt, dass Cadence bei Fehlern den Zustand der Activity nicht wiederherstellt und Activities garantiert höchstens einmal ausgeführt werden. (Idempotente Activities werden bei Fehlern automatisch erneut ausgeführt, nicht-idempotente Activities müssen von einer unternehmensspezifischen Logik behandelt werden, einschließlich Kompensations-Activities. Weitere Informationen zu RetryOptions finden Sie hier.)

Activities sind ebenfalls nur ein POJO/POJI-Paar und enthalten die auszuführenden Aufgaben/Methoden. Jede Methode verfügt über eine @ActivityMethod-Annotation mit Optionen.

public interface ExampleActivities

    {

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

     String task1(String name);

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

         String task2(String name);

    }




public static class ExampleActivitiesImpl implements ExampleActivities

    {

         public String task1(String arg) {

         System.out.println("task1 " + arg + " in " + Activity.getWorkflowExecution().getWorkflowId());

         return arg+"task1";

         }




    public String task2(String arg) {

         System.out.println("task2 " + arg + " in " + Activity.getWorkflowExecution().getWorkflowId());

         return arg+"task2";

         }    

    }

Wir haben die Activities bereits im Workflow-Konstrukteur oben registriert. Beachten Sie, dass man etwas anders vorgehen muss, um die Workflow-ID mit einer Activity zu verknüpfen. Brauchen wir also nur einen Workflow und Activities um loszulegen?

4. Cadence – Domains

Königreich: Staatsgebiet, dessen Beherrschung in die Domäne eines Monarchen fällt (falls Sie mal einen Tipp für das Kreuzworträtsel brauchen) (Quelle: Shutterstock)

Cadence nutzt das Konzept der Domains, die im Grunde nur ein Namensraum für die darin enthaltenen Workflows sind. Sie müssen eine Domain erstellen oder eine bestehende Domain wiederverwenden, bevor Sie einen Workflow oder Worker starten können. Hier sehen Sie ein Beispiel für die Registrierung einer neuen Domain (oder für deren Wiederverwendung, falls sie bereits besteht):

public static void registerDomain(String host, String domain)

    {  

     String nameDescription = "a new domain";

    

      IWorkflowService cadenceService = new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build());

        RegisterDomainRequest request = new RegisterDomainRequest();

        request.setDescription(nameDescription);

        request.setEmitMetric(false);

        request.setName(domain);

        int retentionPeriodInDays = 1;

        request.setWorkflowExecutionRetentionPeriodInDays(retentionPeriodInDays);

        try {

          cadenceService.RegisterDomain(request);

          System.out.println(

              "Successfully registered domain "

                  + domain

                  + " with retentionDays="

                  + retentionPeriodInDays);

        } catch (DomainAlreadyExistsError e) {

          System.out.println("Domain " + domain + " is already registered")

    }

Und nur für den Fall, dass Sie sich fragen, was IWorkflowService und WorkflowServiceTChannel sind (Ich habe mich das auch schon gefragt, und wir werden sie später auch nochmal benutzen)! Diese Typen sind im ServiceClient-Paket enthalten, das nicht gut dokumentiert zu sein scheint, aber es ist anscheinend die Hauptmethode, um eine Verbindung zum Cadence-Server herzustellen.

„Der ServiceClient ist ein RPC-Service-Client, der eine Verbindung zum Cadence-Service herstellt. Er dient auch als Baustein für die anderen Clients.“

Im Java-Client-Code habe ich folgende Client-Typen gefunden (im Client-Paket und im ServiceClient-Paket): client/ActivityCompletionClient, client/WorkflowClient, serviceclient/IWorkflowService. (Vielleicht gibt es noch mehr.)

5. Cadence – Workflow-Client

Wie kommunizieren Sie mit dem Cadence-Server? (Quelle: Shutterstock)

Nun sind wir so weit, dass wir den Beispielcode ausprobieren können. Hier ist etwas Beispielcode, um eine Domain und eine neue WorkflowClient-Instanz zu erstellen:

String host  = "Cadence Server IP";

    

String domainName = "example-domain";

    

registerDomain(domainName);

    

WorkflowClient workflowClient =

                WorkflowClient.newInstance(

                        new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()),

                        WorkflowClientOptions.newBuilder().setDomain(domainName).build());

6. Cadence – Worker

Ich hätte nie gedacht, dass es „zu viele Fahrräder“ geben kann, bis ich in die Niederlande reiste und ganze Horden von Fahrradfahrern erlebte. Das wichtigste Zubehör ist hier nicht der Helm, sondern die Klingel! (Quelle: Adobe Stock)

Wenn Sie den Beispielcode oben versuchsweise ausführen, werden Sie feststellen, dass keine der Aufgaben ausgeführt wird und für den Workflow schließlich ein Timeout eintritt. Was ist schiefgegangen? Activities werden durch Worker ausgeführt. Bevor Arbeiten ausgeführt werden können, müssen für die Activities Worker erstellt werden. Ähnlich ist es bei den Musikern eines Orchesters. Vielleicht sind Dirigent und Partitur schon da. Aber die Partitur wird erst gespielt, wenn die Musiker mit ihren Instrumenten auf die Bühne gekommen und bereit für die Vorführung sind. Dann erst kann der Dirigent das Konzert beginnen.

Wir haben also jetzt einen WorkflowClient und können einen Worker erstellen. Sie müssen den String activityName festlegen und den Workflow und die ActivitiesImplementations im Workflow registrieren, bevor Sie diesen starten. Wenn Sie es versäumen, die Activities zu registrieren, wird kein Fehler gemeldet. Es geschieht aber auch nichts, weil Cadence darauf wartet, dass ein Worker-Prozess startet, zumindest bis zum Workflow-Timeout:

        WorkerFactory factory = WorkerFactory.newInstance(workflowClient);

        Worker worker = factory.newWorker(activityName);

        worker.registerWorkflowImplementationTypes(ExampleWorkflowImpl.class);




        worker.registerActivitiesImplementations(new ExampleActivitiesImpl());

        

        factory.start();

Beachten Sie, dass die Worker in der Regel in einem vom Workflow getrennten Prozess und aus Gründen der Skalierbarkeit auf (mehreren) Servern mit ausreichenden Ressourcen ausgeführt werden, da hier die eigentliche Workflow-Aufgabe „work“, also die Arbeit, erledigt wird.

Aber wie viele Worker haben wir tatsächlich? Es bestehen standardmäßige Beschränkungen für Gleichzeitigkeit und Rate. Die standardmäßigen Einstellungen von WorkerOptions sind

Wenn Sie clever sind und Workflows und Activities (jeweils Anzahl und Zeiten) ausreichend überwachen, könnten Sie vermutlich Little’s Law verwenden, das Gleichzeitigkeit, Durchsatz und Zeit verknüpft, um die magischen Zahlen genauer zu schätzen. Hier sehen Sie ein Beispiel für das Ändern der WorkerOptions-Standardeinstellungen, inklusive einer Erklärung, wie sinnvoll vorgegangen werden kann.

7. Cadence – Workflow-Ausführung: synchron oder asynchron

(Quelle: Shutterstock)

Jetzt ist es an der Zeit, wie folgt die Workflow-Instanz auzuführen und zu starten:

ExampleWorkflow exampleWorkflow = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow.startWorkflow("workflow 1");

Workflows können synchron oder asynchron gestartet werden. Zwei Instanzen starten synchron:

ExampleWorkflow exampleWorkflow = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow.startWorkflow("workflow 1 sync");




ExampleWorkflow exampleWorkflow2 = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow2.startWorkflow("workflow 2 sync");

Asynchroner Start:

ExampleWorkflow exampleWorkflow3 = workflowClient.newWorkflowStub(ExampleWorkflow.class);




CompletableFuture r3 = WorkflowClient.execute(exampleWorkflow3::startWorkflow, "workflow async");

CompletableFuture r4 = WorkflowClient.execute(exampleWorkflow3::startWorkflow, "workflow 2 async");




          try {

r3.get();

r4.get();

} catch (InterruptedException e1) {

e1.printStackTrace();

} catch (ExecutionException e1) {

e1.printStackTrace();

}

Denken Sie daran, den Abschluss der asynchronen Workflows abzuwarten (wie in diesem Beispiel mit der Verwendung von get()). Andernfalls wird nicht viel geschehen, da die Worker in diesem Beispiel in demselben Thread sind.

8. Cadence – asynchrone Activities und Blockieren

(Quelle: Shutterstock)

Nicht nur Workflows lassen sich asynchron aufrufen, auch Activities können gleichzeitig ausgeführt werden. Dies ist hilfreich, wenn Sie eine große Anzahl an gleichzeitigen Aufgaben ausführen und dann abwarten möchten, bis alle abgeschlossen sind. Lassen Sie uns beispielsweise die ursprüngliche Methode startWorkflow(String name) mit der neuen Methode startConcurrentWorkflow(int max) ersetzen. Zudem verwenden wir die ursprüngliche Activity task1() erneut, führen sie jedoch gleichzeitig aus:

        public void startConcurentWorkflow(int concurrency) {

            List<Promise> promises = new ArrayList<>();

            List processed = null;

            try {

                       for (int i=0; i<concurrency; i++)

             {

                        Promise x = Async.function(activities::task1, “subpart “ + i);

             promises.add(x);

             }

                

                // allOf converts a list of promises to a single promise that contains a list

                // of each promise value.

                Promise<List> promiseList = Promise.allOf(promises);




                // get() blocks on all tasks completing

                List results = promiseList.get();

                

                int count = 0;

                for (String s: results)

                {

                 System.out.println("Processed " + s);

                 count++;

                }

                System.out.println("Processed “ + count + " concurrent tasks");

            } finally {

                }

            }

        }

Beachten Sie, dass wir die Methode Async.function(activities::activityName, arg) und die Promise-Klasse mit den Methoden add(), allOf() und get() verwenden müssen, um korrekt zu blockieren, bis alle Activities abgeschlossen sind.

9. Cadence – Einschränkungen

Dieses Schild weist vermutlich darauf hin, dass hier keine Traktoren, Pferdekutschen und Fahrräder fahren dürfen. (Quelle: Shutterstock)

Wirklich clever, wie Cadence den Status von Workflows verwaltet! Beim üblichen Ansatz für Persistenz und zuverlässige Workflow-Engines wird der Status jeder stateful Workflow-Instanz in einer Datenbank beibehalten. Cadence gibt jedoch Statusänderungen als Historienereignisse an die Datenbank aus. Bei Fehlern (selbst, wenn der Workflow im Ruhezustand war) startet Cadence den Workflow einfach erneut und wiederholt die Historie, um zu bestimmen, welche Aufgabe als nächste ausgeführt werden muss. Hierbei kommen sog. event sourcing pattern zum Einsatz, die, ähnlich wie Kafka Streams, den Zustand aus den Ereignisströmen oder Ereignisströme aus dem Zustand berechnen können.

Bei dieser Vorgehensweise wird eine große Anzahl an gleichzeitig ausgeführten Workflow-Instanzen unterstützt, wie sie bei Workflows mit langer Laufzeit und hohem Durchsatz wahrscheinlich vorkommen werden. Dies hat allerdings negative Folgen für lange Workflows: Es sind immer größere Historien erforderlich, und viele Ereignisse müssen wiederholt werden, um den korrekten Zustand zu erhalten.

Dieser Ansatz zieht jedoch einige wichtige Beschränkungen für die Workflow-Implementierung nach sich, denen Sie sich bewusst sein müssen – sie müssen im Prinzip deterministisch sein. Die Cadence-Workflow-Klasse verfügt über viele hilfreiche Methoden, die anstelle von unsicheren / nicht-deterministischen / von Nebeneffekten betroffenen standardmäßigen Java-Methoden verwendet werden können, insbesondere für Zeit, Ruhezustand und Zufallszahlen.

Das ist so ziemlich alles, was ich bis jetzt über Cadence weiß. Es gibt jedoch noch viel mehr zu entdecken, u. a. über die Behandlung von Ausnahmen, Neuversuche, lang laufende Transaktionen, verschiedene Arten von Timeouts, Kompensations-Activities, Signale, Abfragen, abgeleitete Workflows, den Aufruf von Remote-APIs und die Integration mit Kafka (z. B. das Senden und Blockieren/Empfangen einer Nachricht als Activity).

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence