Task & TaskSchedule Java DSL
Instead of using the shell to create and launch tasks, you can use the Java-based DSL provided by the spring-cloud-dataflow-rest-client
module.
The Java DSL for Task
and TaskSchedule
provides convenient wrappers around the DataFlowTemplate
class that enables creating, launching, and scheduling tasks programmatically.
To get started, you need to add the following dependency to your project:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-rest-client</artifactId>
<version>2.10.2</version>
</dependency>
The classes at the heart of the Java DSL are TaskBuilder
, Task
, TaskSchedule
, TaskScheduleBuilder
, and DataFlowTemplate
.
The task DSL also uses a few DataFlowTemplate classes, such as TaskExecutionResource
, TaskExecutionStatus
, JobExecutionResource
and JobInstanceResource
.
The entry point is the builder
method on Task
or TaskSchedule
that takes an instance of a DataFlowTemplate
.
Obtain a DataFlowOperations Instance
The DSL for both the Task
and the TaskSchedule
requires a valid DataFlowOperations
instance.
Spring Cloud Data Flow offers the DataFlowTemplate
as an implementation of the DataFlowOperations
interface.
To create an instance of a DataFlowTemplate
, you need to provide a URI
location of the Data Flow Server.
Spring Boot auto-configuration for DataFlowTemplate
is also available.
You can use the properties in DataFlowClientProperties
to configure the connection to the Data Flow server.
Generally, you should start with the spring.cloud.dataflow.client.server-uri
property:
URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataflowOperations = new DataFlowTemplate(dataFlowUri);
Task DSL Usage
You can create new Task
instances with the help of the TaskBuilder
class, which is returned from the Task.builder(dataFlowOperations)
method.
Consider the following example, which creates a new composed task:
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.spring.io/task-maven-latest", true);
Task task = Task.builder(dataflowOperations)
.name("myComposedTask")
.definition("a: timestamp && b:timestamp")
.description("My Composed Task")
.build();
The build
method returns an instance of a Task
definition that represents a composed task that has been created but not launched.
The timestamp
used in the task definition refers to the task app name, as registered in DataFlow.
To create and launch your tasks, you need to make sure that the corresponding apps have been registered in the Data Flow server first, as shown Batch Developer Guide.
Attempting to launch a task that contains an unknown application throws an exception.
You can register your application by using the DataFlowOperations
, as follows:
dataflowOperations.appRegistryOperations().importFromResource(
"https://dataflow.spring.io/task-maven-latest", true);
Instead of creating a new Task, you can use the TaskBuilder
to retrieve an existing Task instance by name:
Optional<Task> task = Task.builder(dataflowOperations).findByName("myComposedTask");
You can also list all existing tasks:
List<Task> tasks = Task.builder(dataflowOperations).allTasks();
With the Task
instance, you have methods available to launch
or destroy
the task.
The following example launches the task:
long executionId = task.launch();
The executionId
is a unique Task execution identifier for the launched task.
The launch
method is overloaded to take a java.util.Map<String, String>
of launch properties and java.util.List<String>
of command line arguments.
The tasks are asynchronously run. If your use case requires you to wait on task completion or another task state, you can use the Java concurrency utilities or the Awaitility
library, as follows:
org.awaitility.Awaitility.await().until(
() -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);
The Task
instance provides executionStatus
, destroy
, and stop
methods to control and query the task.
The Collection<TaskExecutionResource> executions()
method lists all TaskExecutionResource
instances launched by the task. You can use the executionId
to retrieve the TaskExecutionResource
for a specific execution (Optional<TaskExecutionResource> execution(long executionId)
).
Similarly, the Collection<JobExecutionResource> jobExecutionResources()
and Collection<JobInstanceResource> jobInstanceResources()
would let you introspect any Spring Batch Jobs when the task uses them.
TaskSchedule DSL Usage
Consider the following example, which creates a new task and schedules it:
Task task = Task.builder(dataflowOperations)
.name("myTask")
.definition("timestamp")
.description("simple task")
.build();
TaskSchedule schedule = TaskSchedule.builder(dataflowOperations)
.scheduleName("mySchedule")
.task(task)
.build();
The TaskSchedule.builder(dataFlowOperations)
method returns the TaskScheduleBuilder
class.
The build
method returns an instance of a TaskSchedule
instance called mySchedule
, which is configured with a schedule instance.
At this point, the schedule has not been created.
You can use the schedule()
method to create the schedule:
schedule.schedule("56 20 ? * *", Collections.emptyMap());
You can also use the unschedule()
method to delete it:
schedule.unschedule();
You can use the TaskScheduleBuilder
to retrieve one or all of the existing schedulers:
Optional<TaskSchedule> retrievedSchedule =
taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());
List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);
Setting Deployment Properties
This section covers how to set deployment properties for Task
and TaskScheduler
.
Using the DeploymentPropertiesBuilder
for Task
The launch(Map<String, String> properties, List<String> arguments)
method lets you customize how your tasks are launched.
We made it easier to create a map with properties by using a builder style and creating static methods for some properties so that you need not remember the names of the properties:
Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
.memory("myTask", 512)
.put("app.timestamp.timestamp.format", "YYYY")
.build();
Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
if(task.isPresent()) {
long executionId = task.get().launch(taskLaunchProperties, Collections.EMPTY_LIST);
}
Setting Properties for TaskSchedule
Similarly, you can set deployment properties for a TaskSchedule
instance:
Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);