Instead of using the shell to create and launch tasks, you can use the Java-based DSL provided by the
The Task and TaskScheduler Java DSL provide convenient wrappers around the
DataFlowTemplate class that enables creating, launching and scheduling tasks programmatically.
To get started, you need to add the following dependency to your project:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dataflow-rest-client</artifactId> <version>2.8.0-SNAPSHOT</version> </dependency>
The classes at the heart of the Java DSL are
The task DSL also utilizes few DataFlowTemplate classes such as
The entry point is the
builder method on
TaskSchedule that takes an instance of a
Task and the
TaskSchedule DSL requires a valid
Spring Cloud Data Flow offers the
DataFlowTemplate as an implementation of the
To create an instance of a
DataFlowTemplate, you need to provide a
URI location of the Data Flow Server.
Spring Boot auto-configuration for
DataFlowTemplate is also available.
You can use the properties in DataFlowClientProperties to configure the connection to the Data Flow server.
Generally, you should start with the
URI dataFlowUri = URI.create("http://localhost:9393"); DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);
You can create new
Task instance with the help of the
TaskBuilder class, returned from the
Consider the following example, which creates a new composed task:
dataFlowOperations.appRegistryOperations().importFromResource( "https://dataflow.spring.io/task-maven-latest", true); Task task = Task.builder(dataflowOperations) .name("myComposedTask") .definition("a: timestamp && b:timestamp") .description("My Composed Task") .build();
build method returns an instance of a
Task definition that represents a composed task that has been created but not launched.
timestamp used in the task definition refers to the task app name as registered in DataFlow.
In order to create and launch your tasks, you need to make sure that the corresponding apps have been registered in the Data Flow server first as shown here.
Attempting to launch a task that contains an unknown application throws an exception.
You can register your application by using the
DataFlowOperations, as follows:
dataFlowOperations.appRegistryOperations().importFromResource( "https://dataflow.spring.io/task-maven-latest", true);
Instead of creating a new Task you can use the
TaskBuilder to retrieve an existing Task instances by name:
Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
or to list all existing tasks:
List<Task> tasks = Task.builder(dataflowOperations).allTasks();
Task instance, you have methods available to
destroy the task.
The following example launches the task:
long executionId = task.launch();
executionId is an unique Task execution identifier for the launched task.
launch method is overloaded to take a
java.util.Map<String, String> of launch properties and
java.util.List<String> of the command line arguments.
The tasks are asynchronously run. If your use case requires you to wait on task completion or other task state you can use the java concurrency utils or the
Awaitility library like this:
org.awaitility.Awaitility.await().until( () -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);
Task instance provides
stop methods to control and query the task.
Collection<TaskExecutionResource> executions() method list all
TaskExecutionResources launched by the task. Use the
executionId to retrieve the
TaskExecutionResource for a specific execution (
Optional<TaskExecutionResource> execution(long executionId)).
Collection<JobExecutionResource> jobExecutionResources() and
Collection<JobInstanceResource> jobInstanceResources() would let you introspect any Spring Batch Jobs when the task uses such.
Consider the following example, which creates a new task and schedule it:
Task task = Task.builder(dataflowOperations) .name("myTask") .definition("timestamp") .description("simple task") .build(); TaskSchedule schedule = TaskSchedule.builder(dataFlowOperations) .schedueName("mySchedule") .task(task) .build();
TaskScheduleBuilder class is returned from the
build method returns an instance of a
TaskSchedule instance called
mySchedule configured with a schedule instance.
At this point the schedule has not been created.
Use the schedule() method to create the schedule:
schedule.schedule("56 20 ? * *", Collections.emptyMap());
unschedule() to delete it:
TaskScheduleBuilder can be used to retrieve or all the existing schedulers:
Optional<TaskSchedule> retrievedSchedule = taskScheduleBuilder.findByScheduleName(schedule.getScheduleName()); List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);
launch(Map<String, String> properties, List<String> arguments) method allows customization of how your tasks are launched.
We made it easier to create a map with properties by using a builder style, as well as creating static methods for some properties so you need not remember the name of such properties:
Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder() .memory("myTask", 512) .put("app.timestamp.timestamp.format", "YYYY") .build(); long executionId = task.launch(taskLaunchProperties, Collections.EMPTY_LIST);
Map<String,String> props = new HashMap<>(); props.put("app.timestamp.timestamp.format", "YYYY"); taskSchedule.schedule("*/1 * * * *", props);