In this guide, we develop three Spring Boot applications that use Spring Cloud Stream and deploy them to Cloud Foundry, to Kubernetes, and to your local machine. In another guide, we deploy these applications by using Data Flow. By deploying the applications manually, you get a better understanding of the steps that Data Flow automates for you.
The sections that follow describe how to build these applications from scratch.
If you prefer, you can download a zip file containing the sources for these applications, unzip it, build it, and proceed to the deployment section.
You can download the project that contains all three applications from your browser. You can also use the command line, as the following example shows:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true -O usage-cost-stream-sample.zip
Building the downloaded sample
The stream apps may be configured to run with a Kafka broker or RabbitMQ, using a common code base. The only difference is in the executable jar files. In order for them to work with a Kafka broker, they require a Kafka binder dependency (enabled by default). For RabbitMQ, they require the Rabbit binder.
Additional Spring Cloud Stream binder implementations include Kafka Streams, Amazon Kinesis, Google PubSub (partner maintained), Solace PubSub+ (partner maintained), Azure Event Hubs (partner maintained). The binder selection is a build time step. The sample project uses a Maven profile to enable the corresponding binder.
To build the sample stream app for Kafka, from the root project directory:
$./mvnw clean package -Pkafka
To build the sample stream app for RabbitMQ, from the root project directory:
$./mvnw clean package -Prabbit
Development
We create three Spring Cloud Stream applications that communicate by using the configured binder.
The scenario is a cell phone company creating bills for its customers.
Each call made by a user has a duration
and an amount of data
used during the call.
As part of the process to generate a bill, the raw call data needs to be converted to a cost for the duration of the call and a cost for the amount of data used.
The call is modeled by using the UsageDetail
class that contains the duration
of the call and the amount of data
used during the call.
The bill is modeled by using the UsageCostDetail
class that contains the cost of the call (costCall
) and the cost of the data (costData
). Each class contains an ID (userId
) to identify the person making the call.
The three streaming applications are as follows:
- The
Source
application (namedUsageDetailSender
) generates the users' callduration
and the amount ofdata
used for eachuserId
and sends a message that contains theUsageDetail
object as JSON. - The
Processor
application (namedUsageCostProcessor
) consumes theUsageDetail
and computes the cost of the call and the cost of the data peruserId
. It sends theUsageCostDetail
object as JSON. - The
Sink
application (namedUsageCostLogger
) consumes theUsageCostDetail
object and logs the cost of the call and data.
Source
In this step, we create the UsageDetailSender
source.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-detail-sender-kafka
, and packageo.spring.dataflow.sample.usagedetailsender
. - In the Dependencies text box, type
Kafka
to select the Kafka binder dependency,. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip
the usage-detail-sender-kafka.zip
file and import the project into your favorite IDE.
There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using Kafka as the message broker. The Kafka binder documentation lists the Kafka binder configuration properties.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-detail-sender-rabbit
, and packageo.spring.dataflow.sample.usagedetailsender
. - In the Dependencies text box, type
RabbitMQ
to select the RabbitMQ binder dependency,. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip
the usage-detail-sender-rabbit.zip
file and import the project into your favorite IDE.
Durable Queues
By default, the Spring Cloud Stream consumer application creates an anonymous
auto-delete queue.
This can result in a message not being stored and forwarded by the producer if the producer application started before the consumer application.
Even though the exchange is durable, we need a durable
queue to be bound to the exchange for the message to be stored for later consumption.
Hence, for guaranteed message delivery, you need a durable
queue.
To pre-create durable queues and bind them to the exchange, the producer application should set the following property:
spring.cloud.stream.bindings.<channelName>.producer.requiredGroups
The requiredGroups
property accepts a comma-separated list of groups to which the producer must ensure message delivery.
When this property is set, a durable queue is created by using the <exchange>.<requiredGroup>
format.
There are many configuration options that you can choose to extend or override to achieve the desired runtime behavior when using RabbitMQ as the message broker. The RabbitMQ Binder documentation lists the RabbitMQ binder configuration properties.
Business Logic
Now we can create the code required for this application. To do so:
- Create a
UsageDetail
class in theio.spring.dataflow.sample.usagedetailsender
package that looks like the contents in UsageDetail.java. TheUsageDetail
class containsuserId
,data
, andduration
properties. - Create the
UsageDetailSender
class in theio.spring.dataflow.sample.usagedetailsender
package. It should resemble the following listing:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageDetailSender {
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}
The sendEvents
Supplier provides a UsageDetail
object populated with random values. Spring Cloud Stream automatically binds this function to send its data to the configured output destation. Spring Cloud Stream also configures a default poller for any Supplier that will invoke the function every second, by default.
Configuration
When configuring the source
application, we need to set the output
binding destination (the name of the RabbitMQ exchange or Kafka topic) where the producer will publish the data.
For convenience, we alias the function output binding name, sendEvents-out-0
, indicating the output corresponding to the first output parameter of the sendEvents
function, to the logical name to output
. Alternately, we can bind the output binding name directly: spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail
. See Functional Binding Names for a more detailed explanation.
In src/main/resources/application.properties
, add the following properties:
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
Building
Now we can build the Usage Detail Sender application.
From the usage-detail-sender
root directory, use the following command to build the project with maven:
./mvnw clean package
Testing
Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
Instead of a message broker binder implementation, TestChannelBinderConfiguration
provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages.
The test configuration includes InputDestination
and OutputDestination
beans to send and receive messages.
To unit test the UsageDetailSender
application, add the following code in the UsageDetailSenderApplicationTests
class:
package io.spring.dataflow.sample.usagedetailsender;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageDetailSenderApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000, "usage-detail");
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}
- The
contextLoads
test case verifies that the application starts successfully. - The
testUsageDetailSender
test case uses theOutputDestination
to receive and verify the messages sent by theUsageDetailSender
.
The in-memory test binder behaves exactly as any of the message broker binder implementations. Notably, in a Spring Cloud Stream Application, the message payload is always a byte array, encoded as JSON by default. The consuming application receives the bytes on its input channel and automatically delegates to the appropriate MessageConverter
, based on the content type, to convert the bytes to match the consuming Function's argument type, UsageDetail
in this case. For the test, we need to perform this step explicitly. Alternately, instead of using a MessageConverter
, we could have invoked a JSON parser directly.
Processor
In this step, we create the UsageCostProcessor
processor.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-processor-kafka
, and packageo.spring.dataflow.sample.usagecostprocessor
. - In the Dependencies text box, type
Kafka
to select the Kafka binder dependency,. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-processor-kafka.zip
file and import the project into your favorite IDE.
You can either directly download the project generated by the Spring Initialzr
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-processor-rabbit
, and packageo.spring.dataflow.sample.usagecostprocessor
. - In the Dependencies text box, type
RabbitMQ
to select the RabbitMQ binder dependency,. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-processor-rabbit.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application. To do so:
- Create the
UsageDetail
class in theio.spring.dataflow.sample.usagecostprocessor
. Its contents resemble the contents of UsageDetail.java. TheUsageDetail
class containsuserId
,data
, andduration
properties - Create the
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostprocessor
package. Its contents resemble the contents of UsageCostDetail.java. TheUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostProcessor
class in theio.spring.dataflow.sample.usagecostprocessor
package, which receives theUsageDetail
message, computes the call and data cost, and sends aUsageCostDetail
message. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}
In the preceding application, we declare a Function
bean that accepts a UsageDetail
and returns a UsageCostDetail
. Spring Cloud Stream will discover this function and bind its input and output to destinations configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter
to perform the necessary type conversion.
Configuration
When configuring the processor
application, we need to set the following properties:
- The
input
binding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed. - The
output
binding destination where the producer will publish the data.
For production applications, it is a good idea to set spring.cloud.stream.bindings.input.group
to specify the consumer group to which this consumer application belongs. This ensures that additional consumer applications, each identified with its own group id, will receive every message. Each consumer group may scale to multiple instances to distribute the work load. Spring Cloud Stream abstracts this feature, native to Kafka, in order to extend it to RabbitMQ and other binder implementations.
In src/main/resources/application.properties
, add the following properties:
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
For convenience, we alias the function binding names, processUsageCost-in-0
and processUsageCost-out-0
to input
and output
, respectively.
- The
spring.cloud.stream.bindings.input.destination
property binds theUsageCostProcessor
object'sinput
to theusage-detail
destination. - The
spring.cloud.stream.bindings.output.destination
property binds theUsageCostProcessor
object's output to theusage-cost
destination.
The input destination must be the same as the output destination for the Source application. Likewise, the output destination must be the same as the input destination for the Sink, below.
Building
Now we can build the Usage Cost Processor application.
From the usage-cost-processor
riit directory, use the following command to build the project with maven:
./mvnw clean package
Testing
As mentioned above, Spring Cloud Stream provides a test jar to test the Spring Cloud Stream application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
TestChannelBinderConfiguration
provides an in-memory binder implementation, used to trace and test your application's outbound and inbound messages.
The test configuration includes InputDestination
and OutputDestination
beans to send and receive messages.
To unit test the UsageCostProcessor
application, add following code in the UsageCostProcessorApplicationTests
class:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}
- The
contextLoads
test case verifies that the application starts successfully. - The
testUsageCostProcessor
test case uses theInputDestination
to send a message and theOutputDestination
to receive and verify the message.
Sink
In this step, we create the UsageCostLogger
sink.
You can either directly download the project generated by the Spring Initialzr and click on Generate Project.
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-logger-kafka
, and packageo.spring.dataflow.sample.usagecostlogger
. - In the Dependencies text box, type
Kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click on Generate Project.
Now you should unzip
the usage-cost-logger-kafka.zip
file and import the project into your favorite IDE.
You can either directly download the project generated by the Spring Initialzr and click on Generate Project.
or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-logger-rabbit
, and packageo.spring.dataflow.sample.usagecostlogger
. - In the Dependencies text box, type
RabbitMQ
to select the RabbitMQ binder dependency. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - Click on Generate Project.
Now you should unzip
the usage-cost-logger-rabbit.zip
file and import the project into your favorite IDE.
Business Logic
To create the business logic:
- Create a
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostlogger
package. Its contents should resemble the contents of UsageCostDetail.java. TheUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostLogger
class in theio.spring.dataflow.sample.usagecostlogger
package, which receives theUsageCostDetail
message and logs it. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}
In the preceding application, we declare a Consumer
bean that accepts a UsageCostDetail
. Spring Cloud Stream will discover this function and bind its input to the input destination configured for the messaging middleware. As discussed in the previous section, Spring Cloud Stream uses the appropriate MessageConverter
to perform the necessary type conversion before invoking this Consumer.
Configuration
When configuring the sink
application, we need to set:
- The
input
binding destination (Kafka topic or RabbitMQ exchange) where this application is subscribed. - Optionally, the
group
to specify the consumer group to which this consumer application belongs.
For convenience, we alias the function binding name process-in-0
to input
.
In src/main/resources/application.properties
, add the following properties:
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
Building
Now we can build the Usage Cost Logger application.
From the usage-cost-logger
's root directory, use the following command to build the project with Maven:
./mvnw clean package
Testing
To unit test the UsageCostLogger
, add the following code in the UsageCostLoggerApplicationTests
class:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}
Add the awaitility
dependency in pom.xml
:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
- The
contextLoads
test case verifies that the application starts successfully. - The
testUsageCostLogger
test case verifies that theprocess
method ofUsageCostLogger
is invoked by usingOutputCaptureExtension
from Spring Boot's test framework.
Deployment
The next step is to deploy these applications to one of the supported platforms, using the message broker you configured for these applicactions.