In this guide, we develop three Spring Boot applications that use Spring Cloud Stream and deploy them to Cloud Foundry, to Kubernetes, and to your local machine. In another guide, we deploy these applications by using Data Flow. By deploying the applications manually, you get a better understanding of the steps that Data Flow automates for you.

The sections that follow describe how to build these applications from scratch.

If you prefer, you can download a zip file containing the sources for these applications, unzip it, build it, and proceed to the deployment section.

You can download the project that contains all three applications from your browser. You can also use the command line, as the following example shows:

wget 'https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true' -O usage-cost-stream-sample.zip

Building the downloaded sample

The stream apps may be configured to run with a Kafka broker or RabbitMQ, using a common code base. The only difference is in the executable jar files. In order for them to work with a Kafka broker, they require a Kafka binder dependency (enabled by default). For RabbitMQ, they require the Rabbit binder.

To build the sample stream app for Kafka, from the root project directory:

$./mvnw clean package -Pkafka

To build the sample stream app for RabbitMQ, from the root project directory:

$./mvnw clean package -Prabbit

Development

We create three Spring Cloud Stream applications that communicate by using the configured binder.

The scenario is a cell phone company creating bills for its customers. Each call made by a user has a duration and an amount of data used during the call. As part of the process to generate a bill, the raw call data needs to be converted to a cost for the duration of the call and a cost for the amount of data used.

The call is modeled by using the UsageDetail class that contains the duration of the call and the amount of data used during the call. The bill is modeled by using the UsageCostDetail class that contains the cost of the call (costCall) and the cost of the data (costData). Each class contains an ID (userId) to identify the person making the call.

The three streaming applications are as follows:

  • The Source application (named UsageDetailSender) generates the users' call duration and the amount of data used for each userId and sends a message that contains the UsageDetail object as JSON.
  • The Processor application (named UsageCostProcessor) consumes the UsageDetail and computes the cost of the call and the cost of the data per userId. It sends the UsageCostDetail object as JSON.
  • The Sink application (named UsageCostLogger) consumes the UsageCostDetail object and logs the cost of the call and data.

Source

In this step, we create the UsageDetailSender source.

You can either directly download the project generated by the Spring Initialzr

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-detail-sender-kafka, and package io.spring.dataflow.sample.usagedetailsender.
  2. In the Dependencies text box, type Kafka to select the Kafka binder dependency,.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click the Generate Project button.

Now you should unzip the usage-detail-sender-kafka.zip file and import the project into your favorite IDE.

There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. The Kafka binder documentation lists the Kafka binder configuration properties.

You can either directly download the project generated by the Spring Initialzr

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-detail-sender-rabbit, and package io.spring.dataflow.sample.usagedetailsender.
  2. In the Dependencies text box, type RabbitMQ to select the RabbitMQ binder dependency,.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click the Generate Project button.

Now you should unzip the usage-detail-sender-rabbit.zip file and import the project into your favorite IDE.

Durable Queues

By default, the Spring Cloud Stream consumer application creates an anonymous auto-delete queue. This can result in a message not being stored and forwarded by the producer if the producer application started before the consumer application. Even though the exchange is durable, we need a durable queue to be bound to the exchange for the message to be stored for later consumption. Hence, for guaranteed message delivery, you need a durable queue.

To pre-create durable queues and bind them to the exchange, the producer application should set the following property:

spring.cloud.stream.bindings.<channelName>.producer.requiredGroups

The requiredGroups property accepts a comma-separated list of groups to which the producer must ensure message delivery. When this property is set, a durable queue is created by using the <exchange>.<requiredGroup> format.

There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using RabbitMQ as the message broker. The RabbitMQ Binder documentation lists the RabbitMQ binder configuration properties.

Business Logic

Now we can create the code required for this application. To do so:

  1. Create a UsageDetail class in the io.spring.dataflow.sample.usagedetailsender package that looks like the contents in UsageDetail.java. The UsageDetail class contains userId, data, and duration properties.
  2. Create the UsageDetailSender class in the io.spring.dataflow.sample.usagedetailsender package. It should resemble the following listing:
package io.spring.dataflow.sample.usagedetailsender;

import java.util.Random;
import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageDetailSender {

	private String[] users = {"user1", "user2", "user3", "user4", "user5"};

	@Bean
	public Supplier<UsageDetail> sendEvents() {
		return () -> {
			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId(this.users[new Random().nextInt(5)]);
			usageDetail.setDuration(new Random().nextInt(300));
			usageDetail.setData(new Random().nextInt(700));
			return usageDetail;
		};
	}
}

The sendEvents Supplier provides a UsageDetail object populated with random values. Spring Cloud Stream automatically binds this function to send its data to the configured output destation. Spring Cloud Stream also configures a default poller for any Supplier that will invoke the function every second, by default.

Configuration

When configuring the source application, we need to set the output binding destination (the name of the RabbitMQ exchange or Kafka topic) where the producer will publish the data.

For convenience, we alias the function output binding name, sendEvents-out-0, indicating the output corresponding to the first output parameter of the sendEvents function, to the logical name to output. Alternately, we can bind the output binding name directly: spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail. See Functional Binding Names for a more detailed explanation.

In src/main/resources/application.properties, add the following properties:

spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

Building

Now we can build the Usage Detail Sender application.

From the usage-detail-sender root directory, use the following command to build the project with maven:

./mvnw clean package

Testing

Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

Instead of a message broker binder implementation, TestChannelBinderConfiguration provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages. The test configuration includes InputDestination and OutputDestination beans to send and receive messages. To unit test the UsageDetailSender application, add the following code in the UsageDetailSenderApplicationTests class:

package io.spring.dataflow.sample.usagedetailsender;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageDetailSenderApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageDetailSender() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageDetailSenderApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			UsageDetail usageDetail = (UsageDetail) converter
					.fromMessage(sourceMessage, UsageDetail.class);

			assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
			assertThat(usageDetail.getData()).isBetween(0L, 700L);
			assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
		}
	}
}
  • The contextLoads test case verifies that the application starts successfully.
  • The testUsageDetailSender test case uses the OutputDestination to receive and verify the messages sent by the UsageDetailSender.

The in-memory test binder behaves exactly as any of the message broker binder implementations. Notably, in a Spring Cloud Stream Application, the message payload is always a byte array, encoded as JSON by default. The consuming application receives the bytes on its input channel and automatically delegates to the appropriate MessageConverter, based on the content type, to convert the bytes to match the consuming Function's argument type, UsageDetail in this case. For the test, we need to perform this step explicitly. Alternately, instead of using a MessageConverter, we could have invoked a JSON parser directly.

Processor

In this step, we create the UsageCostProcessor processor.

You can either directly download the project generated by the Spring Initialzr

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-cost-processor-kafka, and package io.spring.dataflow.sample.usagecostprocessor.
  2. In the Dependencies text box, type Kafka to select the Kafka binder dependency,.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click the Generate Project button.

Now you should unzip the usage-cost-processor-kafka.zip file and import the project into your favorite IDE.

You can either directly download the project generated by the Spring Initialzr

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-cost-processor-rabbit, and package io.spring.dataflow.sample.usagecostprocessor.
  2. In the Dependencies text box, type RabbitMQ to select the RabbitMQ binder dependency,.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click the Generate Project button.

Now you should unzip the usage-cost-processor-rabbit.zip file and import the project into your favorite IDE.

Business Logic

Now we can create the code required for this application. To do so:

  1. Create the UsageDetail class in the io.spring.dataflow.sample.usagecostprocessor. Its contents resemble the contents of UsageDetail.java. The UsageDetail class contains userId, data, and duration properties
  2. Create the UsageCostDetail class in the io.spring.dataflow.sample.usagecostprocessor package. Its contents resemble the contents of UsageCostDetail.java. The UsageCostDetail class contains userId, callCost, and dataCost properties.
  3. Create the UsageCostProcessor class in the io.spring.dataflow.sample.usagecostprocessor package, which receives the UsageDetail message, computes the call and data cost, and sends a UsageCostDetail message. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostprocessor;

import java.util.function.Function;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostProcessor {

	private double ratePerSecond = 0.1;

	private double ratePerMB = 0.05;

	@Bean
	public Function<UsageDetail, UsageCostDetail> processUsageCost() {
		return usageDetail -> {
			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId(usageDetail.getUserId());
			usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
			usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
			return usageCostDetail;
		};
	}
}

In the preceding application, we declare a Function bean that accepts a UsageDetail and returns a UsageCostDetail. Spring Cloud Stream will discover this function and bind its input and output to destinations configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter to perform the necessary type conversion.

Configuration

When configuring the processor application, we need to set the following properties:

  • The input binding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed.
  • The output binding destination where the producer will publish the data.

For production applications, it is a good idea to set spring.cloud.stream.bindings.input.group to specify the consumer group to which this consumer application belongs. This ensures that additional consumer applications, each identified with its own group id, will receive every message. Each consumer group may scale to multiple instances to distribute the work load. Spring Cloud Stream abstracts this feature, native to Kafka, in order to extend it to RabbitMQ and other binder implementations.

In src/main/resources/application.properties, add the following properties:

spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

For convenience, we alias the function binding names, processUsageCost-in-0 and processUsageCost-out-0 to input and output, respectively.

  • The spring.cloud.stream.bindings.input.destination property binds the UsageCostProcessor object's input to the usage-detail destination.
  • The spring.cloud.stream.bindings.output.destination property binds the UsageCostProcessor object's output to the usage-cost destination.

The input destination must be the same as the output destination for the Source application. Likewise, the output destination must be the same as the input destination for the Sink, below.

Building

Now we can build the Usage Cost Processor application. From the usage-cost-processor riit directory, use the following command to build the project with maven:

./mvnw clean package

Testing

As mentioned above, Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

TestChannelBinderConfiguration provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages. The test configuration includes InputDestination and OutputDestination beans to send and receive messages. To unit test the UsageCostProcessor application, add following code in the UsageCostProcessorApplicationTests class:

package io.spring.dataflow.sample.usagecostprocessor;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageCostProcessorApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostProcessor() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId("user1");
			usageDetail.setDuration(30L);
			usageDetail.setData(100L);

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			Message<?> message = converter.toMessage(usageDetail, messageHeaders);

			source.send(message);

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000, "usage-cost");

			UsageCostDetail usageCostDetail = (UsageCostDetail) converter
					.fromMessage(sourceMessage, UsageCostDetail.class);

			assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
			assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
		}
	}
}
  • The contextLoads test case verifies that the application starts successfully.
  • The testUsageCostProcessor test case uses the InputDestination to send a message and the OutputDestination to receive and verify the message.

Sink

In this step, we create the UsageCostLogger sink.

You can either directly download the project generated by the Spring Initialzr and click on Generate Project.

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-cost-logger-kafka, and package io.spring.dataflow.sample.usagecostlogger.
  2. In the Dependencies text box, type Kafka to select the Kafka binder dependency.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click on Generate Project.

Now you should unzip the usage-cost-logger-kafka.zip file and import the project into your favorite IDE.

You can either directly download the project generated by the Spring Initialzr and click on Generate Project.

or visit the Spring Initialzr site and follow these instructions:

  1. Create a new Maven project with a Group name of io.spring.dataflow.sample and an Artifact name of usage-cost-logger-rabbit, and package io.spring.dataflow.sample.usagecostlogger.
  2. In the Dependencies text box, type RabbitMQ to select the RabbitMQ binder dependency.
  3. In the Dependencies text box, type Cloud Stream to select the Spring Cloud Stream dependency.
  4. In the Dependencies text box, type Actuator to select the Spring Boot actuator dependency.
  5. Click on Generate Project.

Now you should unzip the usage-cost-logger-rabbit.zip file and import the project into your favorite IDE.

Business Logic

To create the business logic:

  1. Create a UsageCostDetail class in the io.spring.dataflow.sample.usagecostlogger package. Its contents should resemble the contents of UsageCostDetail.java. The UsageCostDetail class contains userId, callCost, and dataCost properties.
  2. Create the UsageCostLogger class in the io.spring.dataflow.sample.usagecostlogger package, which receives the UsageCostDetail message and logs it. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostlogger;

import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostLogger {

	private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);

	@Bean
	public Consumer<UsageCostDetail> process() {
		return usageCostDetail -> {
			logger.info(usageCostDetail.toString());
		};
	}
}

In the preceding application, we declare a Consumer bean that accepts a UsageCostDetail. Spring Cloud Stream will discover this function and bind its input to the input destination configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter to perform the necessary type conversion before invoking this Consumer.

Configuration

When configuring the sink application, we need to set:

  • The input binding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed.
  • Optionally, the group to specify the consumer group to which this consumer application belongs.

For convenience, we alias the function binding name process-in-0 to input.

In src/main/resources/application.properties, add the following properties:

spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

Building

Now we can build the Usage Cost Logger application. From the usage-cost-logger's root directory, use the following command to build the project with Maven:

./mvnw clean package

Testing

To unit test the UsageCostLogger, add the following code in the UsageCostLoggerApplicationTests class:

package io.spring.dataflow.sample.usagecostlogger;

import java.util.HashMap;
import java.util.Map;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostLogger(CapturedOutput output) {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageCostLoggerApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId("user1");
			usageCostDetail.setCallCost(3.0);
			usageCostDetail.setDataCost(5.0);

			MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);

			source.send(message);

			Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
		}
	}
}

Add the awaitility dependency in pom.xml:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>
  • The contextLoads test case verifies that the application starts successfully.
  • The testUsageCostLogger test case verifies that the process method of UsageCostLogger is invoked by using OutputCaptureExtension from Spring Boot's test framework.

Deployment

The next step is to deploy these applications to one of the supported platforms, using the message broker you configured for these applicactions.