Task & TaskSchedule Java DSL

Instead of using the shell to create and launch tasks, you can use the Java-based DSL provided by the spring-cloud-dataflow-rest-client module. The Task and TaskScheduler Java DSL provide convenient wrappers around the DataFlowTemplate class that enables creating, launching and scheduling tasks programmatically.

To get started, you need to add the following dependency to your project:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-dataflow-rest-client</artifactId>
	<version>2.7.0</version>
</dependency>

The classes at the heart of the Java DSL are TaskBuilder, Task, TaskSchedule, TaskScheduleBuilder and DataFlowTemplate. The task DSL also utilizes few DataFlowTemplate classes such as TaskExecutionResource, TaskExecutionStatus, JobExecutionResource and JobInstanceResource.

The entry point is the builder method on Task and TaskSchedule that takes an instance of a DataFlowTemplate.

Obtain DataFlowOperations

Both the Task and the TaskSchedule DSL requires a valid DataFlowOperations instance. Spring Cloud Data Flow offers the DataFlowTemplate as an implementation of the DataFlowOperations interface.

To create an instance of a DataFlowTemplate, you need to provide a URI location of the Data Flow Server. Spring Boot auto-configuration for DataFlowTemplate is also available. You can use the properties in DataFlowClientProperties to configure the connection to the Data Flow server. Generally, you should start with the spring.cloud.dataflow.client.uri property.

URI dataFlowUri = URI.create("http://localhost:9393");
DataFlowOperations dataFlowOperations = new DataFlowTemplate(dataFlowUri);

Task DSL Usage

You can create new Task instance with the help of the TaskBuilder class, returned from the Task.builder(dataFlowOperations) method.

Consider the following example, which creates a new composed task:

dataFlowOperations.appRegistryOperations().importFromResource(
                     "https://dataflow.spring.io/task-maven-latest", true);

Task task = Task.builder(dataflowOperations)
              .name("myComposedTask")
              .definition("a: timestamp && b:timestamp")
              .description("My Composed Task")
              .build();

The build method returns an instance of a Task definition that represents a composed task that has been created but not launched. The timestamp used in the task definition refers to the task app name as registered in DataFlow.

In order to create and launch your tasks, you need to make sure that the corresponding apps have been registered in the Data Flow server first as shown here.

Attempting to launch a task that contains an unknown application throws an exception. You can register your application by using the DataFlowOperations, as follows:

dataFlowOperations.appRegistryOperations().importFromResource(
            "https://dataflow.spring.io/task-maven-latest", true);

Instead of creating a new Task you can use the TaskBuilder to retrieve an existing Task instances by name:

Optional<Task> task = Task.builder(dataflowOperations).findByName("myTask");

or to list all existing tasks:

List<Task> tasks = Task.builder(dataflowOperations).allTasks();

With the Task instance, you have methods available to launch or destroy the task. The following example launches the task:

long executionId = task.launch();

The executionId is an unique Task execution identifier for the launched task. The launch method is overloaded to take a java.util.Map<String, String> of launch properties and java.util.List<String> of the command line arguments.

The tasks are asynchronously run. If your use case requires you to wait on task completion or other task state you can use the java concurrency utils or the Awaitility library like this:

org.awaitility.Awaitility.await().until(
  () -> task.executionStatus(executionId) == TaskExecutionStatus.COMPLETE);

The Task instance provides executionStatus, destroy, and stop methods to control and query the task.

The Collection<TaskExecutionResource> executions() method list all TaskExecutionResources launched by the task. Use the executionId to retrieve the TaskExecutionResource for a specific execution (Optional<TaskExecutionResource> execution(long executionId)).

Similarly the Collection<JobExecutionResource> jobExecutionResources() and Collection<JobInstanceResource> jobInstanceResources() would let you introspect any Spring Batch Jobs when the task uses such.

TaskSchedule DSL Usage

Consider the following example, which creates a new task and schedule it:

Task task = Task.builder(dataflowOperations)
              .name("myTask")
              .definition("timestamp")
              .description("simple task")
              .build();

TaskSchedule schedule = TaskSchedule.builder(dataFlowOperations)
              .schedueName("mySchedule")
              .task(task)
              .build();

The TaskScheduleBuilder class is returned from the TaskSchedule.builder(dataFlowOperations) method.

The build method returns an instance of a TaskSchedule instance called mySchedule configured with a schedule instance. At this point the schedule has not been created.

Use the schedule() method to create the schedule:

schedule.schedule("56 20 ? * *", Collections.emptyMap());

and the unschedule() to delete it:

schedule.unschedule();

The TaskScheduleBuilder can be used to retrieve or all the existing schedulers:

Optional<TaskSchedule> retrievedSchedule =
          taskScheduleBuilder.findByScheduleName(schedule.getScheduleName());

List<TaskSchedule> allSchedulesPerTask = taskScheduleBuilder.list(task);

Setting Deployment Properties

Using the DeploymentPropertiesBuilder for Task

The launch(Map<String, String> properties, List<String> arguments) method allows customization of how your tasks are launched. We made it easier to create a map with properties by using a builder style, as well as creating static methods for some properties so you need not remember the name of such properties:

Map<String, String> taskLaunchProperties = new DeploymentPropertiesBuilder()
		.memory("myTask", 512)
		.put("app.timestamp.timestamp.format", "YYYY")
		.build();

long executionId = task.launch(taskLaunchProperties, Collections.EMPTY_LIST);

Setting Properties for a TaskSchedule

Map<String,String> props = new HashMap<>();
props.put("app.timestamp.timestamp.format", "YYYY");
taskSchedule.schedule("*/1 * * * *", props);