Korrektes Herunterfahren von Aktivitäts- und Workflow-Workern - AWS SDK for Java 1.x

Die Version AWS SDK for Java 1.x wurde end-of-support am 31. Dezember 2025 erreicht. Wir empfehlen Ihnen, auf den zu migrieren AWS SDK for Java 2.x, um weiterhin neue Funktionen, Verfügbarkeitsverbesserungen und Sicherheitsupdates zu erhalten.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Korrektes Herunterfahren von Aktivitäts- und Workflow-Workern

Das Thema Erstellen einer einfachen Amazon SWF Anwendung bot eine vollständige Implementierung einer einfachen Workflow-Anwendung, die aus einer Registrierungsanwendung, einem Aktivitäts- und Workflow-Worker sowie einem Workflow-Starter bestand.

Worker-Klassen sind so konzipiert, dass sie kontinuierlich ausgeführt werden und nach Aufgaben suchen, die von Amazon SWF gesendet wurden, um Aktivitäten auszuführen oder Entscheidungen zurückzugeben. Sobald eine Umfrage angefordert wurde, wird der Abfragende Amazon SWF aufgezeichnet und versucht, ihm eine Aufgabe zuzuweisen.

Wenn der Workflow-Worker während einer langen Umfrage beendet wird, versucht er Amazon SWF möglicherweise trotzdem, eine Aufgabe an den abgebrochenen Mitarbeiter zu senden, was dazu führt, dass die Aufgabe verloren geht (bis die Aufgabe das Timeout erreicht).

Eine Möglichkeit zur Bewältigung dieser Situation besteht darin, zu warten, bis alle Long-Poll-Anforderungen zurückkehren, bevor der Worker beendet wird.

In diesem Thema schreiben wir den Aktivitäts-Worker von helloswf um und verwenden Java-Hooks für das Herunterfahren. So versuchen wir, den Aktivitäts-Worker ordnungsgemäß herunterzufahren.

Hier finden Sie den vollständigen Code:

import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.amazonaws.regions.Regions; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow; import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClientBuilder; import com.amazonaws.services.simpleworkflow.model.ActivityTask; import com.amazonaws.services.simpleworkflow.model.PollForActivityTaskRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskCompletedRequest; import com.amazonaws.services.simpleworkflow.model.RespondActivityTaskFailedRequest; import com.amazonaws.services.simpleworkflow.model.TaskList; public class ActivityWorkerWithGracefulShutdown { private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); private static final CountDownLatch waitForTermination = new CountDownLatch(1); private static volatile boolean terminate = false; private static String executeActivityTask(String input) throws Throwable { return "Hello, " + input + "!"; } public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { terminate = true; System.out.println("Waiting for the current poll request" + " to return before shutting down."); waitForTermination.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { // ignore } } }); try { pollAndExecute(); } finally { waitForTermination.countDown(); } } public static void pollAndExecute() { while (!terminate) { System.out.println("Polling for an activity task from the tasklist '" + HelloTypes.TASKLIST + "' in the domain '" + HelloTypes.DOMAIN + "'."); ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest() .withDomain(HelloTypes.DOMAIN) .withTaskList(new TaskList().withName(HelloTypes.TASKLIST))); String taskToken = task.getTaskToken(); if (taskToken != null) { String result = null; Throwable error = null; try { System.out.println("Executing the activity task with input '" + task.getInput() + "'."); result = executeActivityTask(task.getInput()); } catch (Throwable th) { error = th; } if (error == null) { System.out.println("The activity task succeeded with result '" + result + "'."); swf.respondActivityTaskCompleted( new RespondActivityTaskCompletedRequest() .withTaskToken(taskToken) .withResult(result)); } else { System.out.println("The activity task failed with the error '" + error.getClass().getSimpleName() + "'."); swf.respondActivityTaskFailed( new RespondActivityTaskFailedRequest() .withTaskToken(taskToken) .withReason(error.getClass().getSimpleName()) .withDetails(error.getMessage())); } } } } }

In dieser Version wurde der Polling-Code aus der main-Funktion in der ursprünglichen Version in eine eigene Methode pollAndExecute verschoben.

Die main Funktion verwendet nun einen CountDownLatchin Verbindung mit einem Shutdown-Hook, damit der Thread bis zu 60 Sekunden wartet, nachdem seine Beendigung angefordert wurde, bevor der Thread beendet wird.