Task & TaskSchedule Java DSL
Instead of using the shell to create and launch tasks, you can use the Java-based DSL provided by the spring-cloud-dataflow-rest-client
module.
The Task and TaskScheduler Java DSL provide convenient wrappers around the DataFlowTemplate
class that enables creating, launching and scheduling tasks programmatically.
To get started, you need to add the following dependency to your project:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-rest-client</artifactId>
<version>2.8.0-SNAPSHOT</version>
</dependency>
The classes at the heart of the Java DSL are TaskBuilder
, Task
, TaskSchedule
, TaskScheduleBuilder
and DataFlowTemplate
.
The task DSL also utilizes few DataFlowTemplate classes such as TaskExecutionResource
, TaskExecutionStatus
, JobExecutionResource
and JobInstanceResource
.
The entry point is the builder
method on Task
and TaskSchedule
that takes an instance of a DataFlowTemplate
.
Obtain DataFlowOperations
Both the Task
and the TaskSchedule
DSL requires a valid DataFlowOperations
instance.
Spring Cloud Data Flow offers the DataFlowTemplate
as an implementation of the DataFlowOperations
interface.
To create an instance of a DataFlowTemplate
, you need to provide a URI
location of the Data Flow Server.
Spring Boot auto-configuration for DataFlowTemplate
is also available.
You can use the properties in DataFlowClientProperties to configure the connection to the Data Flow server.
Generally, you should start with the spring.cloud.dataflow.client.uri
property.
URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);
Task DSL Usage
You can create new Task
instance with the help of the TaskBuilder
class, returned from the Task.builder(dataFlowOperations)
method.
Consider the following example, which creates a new composed task:
dataFlowOperations.appRegistryOperations().importFromResource(
"https://dataflow.spring.io/task-maven-latest", true);
Task task = Task.builder(dataflowOperations)
.name("myComposedTask")
.definition("a: timestamp && b:timestamp")
.description("My Composed Task")
.build();
The build
method returns an instance of a Task
definition that represents a composed task that has been created but not launched.
The timestamp
used in the task definition refers to the task app name as registered in DataFlow.
In order to create and launch your tasks, you need to make sure that the corresponding apps have been registered in the Data Flow server first as shown here.
Attempting to launch a task that contains an unknown application throws an exception.
You can register your application by using the DataFlowOperations
, as follows:
dataFlowOperations.appRegistryOperations().importFromResource(
"https://dataflow.spring.io/task-maven-latest", true);
Instead of creating a new Task you can use the TaskBuilder
to retrieve an existing Task instances by name:
Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");
or to list all existing tasks:
List<Task> tasks = Task.builder(dataflowOperations).allTasks();
With the Task
instance, you have methods available to launch
or destroy
the task.
The following example launches the task:
long executionId = task.launch();
The executionId
is an unique Task execution identifier for the launched task.
The launch
method is overloaded to take a java.util.Map<String, String>
of launch properties and java.util.List<String>
of the command line arguments.
The tasks are asynchronously run. If your use case requires you to wait on task completion or other task state you can use the java concurrency utils or the Awaitility
library like this:
org.awaitility.Awaitility.await().until(
() -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);
The Task
instance provides executionStatus
, destroy
, and stop
methods to control and query the task.
The Collection<TaskExecutionResource> executions()
method list all TaskExecutionResource
s launched by the task. Use the executionId
to retrieve the TaskExecutionResource
for a specific execution (Optional<TaskExecutionResource> execution(long executionId)
).
Similarly the Collection<JobExecutionResource> jobExecutionResources()
and Collection<JobInstanceResource> jobInstanceResources()
would let you introspect any Spring Batch Jobs when the task uses such.
TaskSchedule DSL Usage
Consider the following example, which creates a new task and schedule it:
Task task = Task.builder(dataflowOperations)
.name("myTask")
.definition("timestamp")
.description("simple task")
.build();
TaskSchedule schedule = TaskSchedule.builder(dataFlowOperations)
.schedueName("mySchedule")
.task(task)
.build();
The TaskScheduleBuilder
class is returned from the TaskSchedule.builder(dataFlowOperations)
method.
The build
method returns an instance of a TaskSchedule
instance called mySchedule
configured with a schedule instance.
At this point the schedule has not been created.
Use the schedule() method to create the schedule:
schedule.schedule("56 20 ? * *", Collections.emptyMap());
and the unschedule()
to delete it:
schedule.unschedule();
The TaskScheduleBuilder
can be used to retrieve or all the existing schedulers:
Optional<TaskSchedule> retrievedSchedule =
taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());
List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);
Setting Deployment Properties
Using the DeploymentPropertiesBuilder
for Task
The launch(Map<String, String> properties, List<String> arguments)
method allows customization of how your tasks are launched.
We made it easier to create a map with properties by using a builder style, as well as creating static methods for some properties so you need not remember the name of such properties:
Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
.memory("myTask", 512)
.put("app.timestamp.timestamp.format", "YYYY")
.build();
long executionId = task.launch(taskLaunchProperties, Collections.EMPTY_LIST);
Setting Properties for a TaskSchedule
Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);