In this guide, we develop three Spring Boot applications that use Spring Cloud Stream and deploy them to Cloud Foundry, to Kubernetes, and to your local machine. In another guide, we deploy these applications by using Data Flow. By deploying the applications manually, you get a better understanding of the steps that Data Flow automates for you.
The sections that follow describe how to build these applications from scratch.
If you prefer, you can download a zip file containing the sources for these applications, unzip it, build it, and proceed to the deployment section.
You can download the project that contains all three applications from your browser. You can also use the command line, as the following example shows:
wget 'https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/main/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true' -O usage-cost-stream-sample.zipBuilding the downloaded sample
The stream apps may be configured to run with a Kafka broker or RabbitMQ, using a common code base. The only difference is in the executable jar files. In order for them to work with a Kafka broker, they require a Kafka binder dependency (enabled by default). For RabbitMQ, they require the Rabbit binder.
To build the sample stream app for Kafka, from the root project directory:
$./mvnw clean package -PkafkaTo build the sample stream app for RabbitMQ, from the root project directory:
$./mvnw clean package -PrabbitDevelopment
We create three Spring Cloud Stream applications that communicate by using the configured binder.
The scenario is a cell phone company creating bills for its customers.
Each call made by a user has a duration and an amount of data used during the call.
As part of the process to generate a bill, the raw call data needs to be converted to a cost for the duration of the call and a cost for the amount of data used.
The call is modeled by using the UsageDetail class that contains the duration of the call and the amount of data used during the call.
The bill is modeled by using the UsageCostDetail class that contains the cost of the call (costCall) and the cost of the data (costData). Each class contains an ID (userId) to identify the person making the call.
The three streaming applications are as follows:
- The
Sourceapplication (namedUsageDetailSender) generates the users' calldurationand the amount ofdataused for eachuserIdand sends a message that contains theUsageDetailobject as JSON. - The
Processorapplication (namedUsageCostProcessor) consumes theUsageDetailand computes the cost of the call and the cost of the data peruserId. It sends theUsageCostDetailobject as JSON. - The
Sinkapplication (namedUsageCostLogger) consumes theUsageCostDetailobject and logs the cost of the call and data.
Source
In this step, we create the UsageDetailSender source.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-detail-sender-kafka, and packageio.spring.dataflow.sample.usagedetailsender. - In the Dependencies text box, type
Kafkato select the Kafka binder dependency,. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip the usage-detail-sender-kafka.zip file and import the project into your favorite IDE.
There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. The Kafka binder documentation lists the Kafka binder configuration properties.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-detail-sender-rabbit, and packageio.spring.dataflow.sample.usagedetailsender. - In the Dependencies text box, type
RabbitMQto select the RabbitMQ binder dependency,. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip the usage-detail-sender-rabbit.zip file and import the project into your favorite IDE.
Durable Queues
By default, the Spring Cloud Stream consumer application creates an anonymous auto-delete queue.
This can result in a message not being stored and forwarded by the producer if the producer application started before the consumer application.
Even though the exchange is durable, we need a durable queue to be bound to the exchange for the message to be stored for later consumption.
Hence, for guaranteed message delivery, you need a durable queue.
To pre-create durable queues and bind them to the exchange, the producer application should set the following property:
spring.cloud.stream.bindings.<channelName>.producer.requiredGroupsThe requiredGroups property accepts a comma-separated list of groups to which the producer must ensure message delivery.
When this property is set, a durable queue is created by using the <exchange>.<requiredGroup> format.
There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using RabbitMQ as the message broker. The RabbitMQ Binder documentation lists the RabbitMQ binder configuration properties.
Business Logic
Now we can create the code required for this application. To do so:
- Create a
UsageDetailclass in theio.spring.dataflow.sample.usagedetailsenderpackage that looks like the contents in UsageDetail.java. TheUsageDetailclass containsuserId,data, anddurationproperties. - Create the
UsageDetailSenderclass in theio.spring.dataflow.sample.usagedetailsenderpackage. It should resemble the following listing:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageDetailSender {
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}The sendEvents Supplier provides a UsageDetail object populated with random values. Spring Cloud Stream automatically binds this function to send its data to the configured output destation. Spring Cloud Stream also configures a default poller for any Supplier that will invoke the function every second, by default.
Configuration
When configuring the source application, we need to set the output binding destination (the name of the RabbitMQ exchange or Kafka topic) where the producer will publish the data.
For convenience, we alias the function output binding name, sendEvents-out-0, indicating the output corresponding to the first output parameter of the sendEvents function, to the logical name to output. Alternately, we can bind the output binding name directly: spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail. See Functional Binding Names for a more detailed explanation.
In src/main/resources/application.properties, add the following properties:
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0Building
Now we can build the Usage Detail Sender application.
From the usage-detail-sender root directory, use the following command to build the project with maven:
./mvnw clean packageTesting
Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>Instead of a message broker binder implementation, TestChannelBinderConfiguration provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages.
The test configuration includes InputDestination and OutputDestination beans to send and receive messages.
To unit test the UsageDetailSender application, add the following code in the UsageDetailSenderApplicationTests class:
package io.spring.dataflow.sample.usagedetailsender;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageDetailSenderApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}- The
contextLoadstest case verifies that the application starts successfully. - The
testUsageDetailSendertest case uses theOutputDestinationto receive and verify the messages sent by theUsageDetailSender.
The in-memory test binder behaves exactly as any of the message broker binder implementations. Notably, in a Spring Cloud Stream Application, the message payload is always a byte array, encoded as JSON by default. The consuming application receives the bytes on its input channel and automatically delegates to the appropriate MessageConverter, based on the content type, to convert the bytes to match the consuming Function's argument type, UsageDetail in this case. For the test, we need to perform this step explicitly. Alternately, instead of using a MessageConverter, we could have invoked a JSON parser directly.
Processor
In this step, we create the UsageCostProcessor processor.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-cost-processor-kafka, and packageio.spring.dataflow.sample.usagecostprocessor. - In the Dependencies text box, type
Kafkato select the Kafka binder dependency,. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip the usage-cost-processor-kafka.zip file and import the project into your favorite IDE.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-cost-processor-rabbit, and packageio.spring.dataflow.sample.usagecostprocessor. - In the Dependencies text box, type
RabbitMQto select the RabbitMQ binder dependency,. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip the usage-cost-processor-rabbit.zip file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application. To do so:
- Create the
UsageDetailclass in theio.spring.dataflow.sample.usagecostprocessor. Its contents resemble the contents of UsageDetail.java. TheUsageDetailclass containsuserId,data, anddurationproperties - Create the
UsageCostDetailclass in theio.spring.dataflow.sample.usagecostprocessorpackage. Its contents resemble the contents of UsageCostDetail.java. TheUsageCostDetailclass containsuserId,callCost, anddataCostproperties. - Create the
UsageCostProcessorclass in theio.spring.dataflow.sample.usagecostprocessorpackage, which receives theUsageDetailmessage, computes the call and data cost, and sends aUsageCostDetailmessage. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}In the preceding application, we declare a Function bean that accepts a UsageDetail and returns a UsageCostDetail. Spring Cloud Stream will discover this function and bind its input and output to destinations configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter to perform the necessary type conversion.
Configuration
When configuring the processor application, we need to set the following properties:
- The
inputbinding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed. - The
outputbinding destination where the producer will publish the data.
For production applications, it is a good idea to set spring.cloud.stream.bindings.input.group to specify the consumer group to which this consumer application belongs. This ensures that additional consumer applications, each identified with its own group id, will receive every message. Each consumer group may scale to multiple instances to distribute the work load. Spring Cloud Stream abstracts this feature, native to Kafka, in order to extend it to RabbitMQ and other binder implementations.
In src/main/resources/application.properties, add the following properties:
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0For convenience, we alias the function binding names, processUsageCost-in-0 and processUsageCost-out-0 to input and output, respectively.
- The
spring.cloud.stream.bindings.input.destinationproperty binds theUsageCostProcessorobject'sinputto theusage-detaildestination. - The
spring.cloud.stream.bindings.output.destinationproperty binds theUsageCostProcessorobject's output to theusage-costdestination.
The input destination must be the same as the output destination for the Source application. Likewise, the output destination must be the same as the input destination for the Sink, below.
Building
Now we can build the Usage Cost Processor application.
From the usage-cost-processor riit directory, use the following command to build the project with maven:
./mvnw clean packageTesting
As mentioned above, Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>TestChannelBinderConfiguration provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages.
The test configuration includes InputDestination and OutputDestination beans to send and receive messages.
To unit test the UsageCostProcessor application, add following code in the UsageCostProcessorApplicationTests class:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-cost");
UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}- The
contextLoadstest case verifies that the application starts successfully. - The
testUsageCostProcessortest case uses theInputDestinationto send a message and theOutputDestinationto receive and verify the message.
Sink
In this step, we create the UsageCostLogger sink.
You can either directly download the project generated by the Spring Initialzr and click on Generate Project.
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-cost-logger-kafka, and packageio.spring.dataflow.sample.usagecostlogger. - In the Dependencies text box, type
Kafkato select the Kafka binder dependency. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click on Generate Project.
Now you should unzip the usage-cost-logger-kafka.zip file and import the project into your favorite IDE.
You can either directly download the project generated by the Spring Initialzr and click on Generate Project.
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sampleand an Artifact name ofusage-cost-logger-rabbit, and packageio.spring.dataflow.sample.usagecostlogger. - In the Dependencies text box, type
RabbitMQto select the RabbitMQ binder dependency. - In the Dependencies text box, type
Cloud Streamto select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuatorto select the Spring Boot actuator dependency. - Click on Generate Project.
Now you should unzip the usage-cost-logger-rabbit.zip file and import the project into your favorite IDE.
Business Logic
To create the business logic:
- Create a
UsageCostDetailclass in theio.spring.dataflow.sample.usagecostloggerpackage. Its contents should resemble the contents of UsageCostDetail.java. TheUsageCostDetailclass containsuserId,callCost, anddataCostproperties. - Create the
UsageCostLoggerclass in theio.spring.dataflow.sample.usagecostloggerpackage, which receives theUsageCostDetailmessage and logs it. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}In the preceding application, we declare a Consumer bean that accepts a UsageCostDetail. Spring Cloud Stream will discover this function and bind its input to the input destination configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter to perform the necessary type conversion before invoking this Consumer.
Configuration
When configuring the sink application, we need to set:
- The
inputbinding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed. - Optionally, the
groupto specify the consumer group to which this consumer application belongs.
For convenience, we alias the function binding name process-in-0 to input.
In src/main/resources/application.properties, add the following properties:
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0Building
Now we can build the Usage Cost Logger application.
From the usage-cost-logger's root directory, use the following command to build the project with Maven:
./mvnw clean packageTesting
To unit test the UsageCostLogger, add the following code in the UsageCostLoggerApplicationTests class:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}Add the awaitility dependency in pom.xml:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>- The
contextLoadstest case verifies that the application starts successfully. - The
testUsageCostLoggertest case verifies that theprocessmethod ofUsageCostLoggeris invoked by usingOutputCaptureExtensionfrom Spring Boot's test framework.
Deployment
The next step is to deploy these applications to one of the supported platforms, using the message broker you configured for these applicactions.