Stream Processing with Apache Kafka
In this guide, we develop three Spring Boot applications that use Spring Cloud Stream's support for Apache Kafka and deploy them to Cloud Foundry, Kubernetes, and your local machine. In another guide, we deploy these applications by using Spring Cloud Data Flow. By deploying the applications manually, you get a better understanding of the steps that Data Flow can automate for you.
The following sections describe how to build these applications from scratch. If you prefer, you can download a zip file that contains the sources for these applications, unzip it, and proceed to the deployment section.
You can download a zip file containing the completed application that contains all three applications from your browser. You can also download the zip file from the command line by using the following command:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/dist/usage-cost-stream-kafka.zip?raw=true -O usage-cost-stream-kafka.zip
Development
We create three Spring Cloud Stream applications that communicate using Kafka.
The scenario is a cell phone company creating bills for its customers.
Each call made by a user has a duration
and an amount of data
used during the call.
As part of the process to generate a bill, the raw call data needs to be converted to a cost for the duration of the call and a cost for the amount of data used.
The call is modeled by using the UsageDetail
class, which contains the duration
of the call and the amount of data
used during the call.
The bill is modeled by using the UsageCostDetail
class, which contains the cost of the call (costCall
) and the cost of the data (costData
). Each class contains an ID (userId
) to identify the person making the call.
The three streaming applications are as follows:
- The
Source
application (namedUsageDetailSender
) generates the user's callduration
and amount ofdata
used peruserId
and sends a message containing theUsageDetail
object as JSON. - The
Processor
application (namedUsageCostProcessor
) consumes theUsageDetail
and computes the cost of the call and the cost of the data peruserId
. It sends theUsageCostDetail
object as JSON. - The
Sink
application (namedUsageCostLogger
) consumes theUsageCostDetail
object and logs the cost of the call and the cost of the data.
UsageDetailSender source
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-detail-sender-kafka
. - In the Dependencies text box, type
Kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-detail-sender-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application. To do so:
- Create a
UsageDetail
class in theio.spring.dataflow.sample.usagedetailsender
package with content that resembles UsageDetail.java. ThisUsageDetail
model containsuserId
,data
, andduration
properties. - Create the
UsageDetailSender
class in theio.spring.dataflow.sample.usagedetailsender
package with content that resembles the following:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {
@Autowired
private Source source;
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Scheduled(fixedDelay = 1000)
public void sendEvents() {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
}
}
The @EnableBinding
annotation indicates that you want to bind your application to the messaging middleware.
The annotation takes one or more interfaces as a parameter — in this case, the Source interface that defines an output channel named output
.
In the case of Kafka, messages sent to the output
channel are, in turn, sent the Kafka topic.
The @EnableScheduling
annotation indicates that you want to enable Spring's scheduling capabilities, which invoke methods annotated with @Scheduled
with the specified fixedDelay
of 1
second.
The sendEvents
method constructs a UsageDetail
object and then sends it to the the output channel by accessing the Source
object's output().send()
method.
Configuring the UsageDetailSender application
When configuring the producer
application, we need to set the output
binding destination (Kafka topic) where the producer publishes the data.
In src/main/resources/application.properties
, you can add the following property:
spring.cloud.stream.bindings.output.destination=usage-detail
The spring.cloud.stream.bindings.output.destination
property binds the UsageDetailSender
object's output to the usage-detail
Kafka topic.
Building
Now we can build the Usage Detail Sender application.
In the usage-detail-sender
directory, use the following command to build the project using maven:
./mvnw clean package
Testing
Spring Cloud Stream provides the spring-cloud-stream-test-support
dependency to test the Spring Cloud Stream application.
Instead of the Kafka
binder, the tests use the Test
binder to trace and test your application's outbound and inbound messages.
The Test
binder uses a utility class called MessageCollector
, which stores the messages in-memory.
To unit test this UsageDetailSender
application, add the following code in the UsageDetailSenderApplicationTests
class:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.spring.dataflow.sample.UsageDetail;
import org.json.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
import static org.junit.Assert.assertTrue;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {
@Autowired
private MessageCollector messageCollector;
@Autowired
private Source source;
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() throws Exception {
Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS);
String usageDetailJSON = message.getPayload().toString();
assertTrue(usageDetailJSON.contains("userId"));
assertTrue(usageDetailJSON.contains("duration"));
assertTrue(usageDetailJSON.contains("data"));
}
}
When using the spring-cloud-stream-test-support
dependency, your application's output
and input
are bound to the Test
binder.
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageDetailSender
test case uses theTest
binder'sMessageCollector
to collect the messages sent by theUsageDetailSender
.
UsageCostProcessor
Processor
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-processor-kafka
. - In the Dependencies text box, type
kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
cloud stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-processor-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application.
- Create the
UsageDetail
class in theio.spring.dataflow.sample.usagecostprocessor
with content that resembles UsageDetail.java. TheUsageDetail
class containsuserId
,data
and,duration
properties. - Create the
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostprocessor
package with content that resembles UsageCostDetail.java. ThisUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostProcessor
class in theio.spring.dataflow.sample.usagecostprocessor
package that receives theUsageDetail
message, computes the call and data cost and sends aUsageCostDetail
message. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostprocessor;
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(Processor.class)
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
}
}
In the preceding application, the @EnableBinding
annotation indicates that you want to bind your application to the messaging middleware. The annotation takes one or more interfaces as a parameter — in this case, the Processor that defines and input and output channels.
The @StreamListener
annotation binds the application's input
channel to the processUsageCost
method by converting the incoming JSON into UsageDetail
object. We configure the Kafka topic that is bound to the input channel later.
The @SendTo
annotation sends the processUsageCost
method's output to the application's output
channel, which is, in turn, sent to the a Kafka topic that we configure later.
Configuring the UsageCostProcessor
Application
When configuring the consumer
application, we need to set the input
binding destination (a Kafka topic).
Since the UsageCostProcessor
application is also a producer
application, we need to set the output
binding destination (a Kafka topic) where the producer publishes the data.
In src/main/resources/application.properties
, you can add the following properties:
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
- The
spring.cloud.stream.bindings.input.destination
property binds theUsageCostProcessor
object'sinput
to theusage-detail
Kafka topic. - The
spring.cloud.stream.bindings.output.destination
property binds theUsageCostProcessor
object's output to theusage-cost
Kafka topic.
Building
Now we can build the Usage Cost Processor application.
In the usage-cost-processor
directory, use the following command to build the project with Maven:
./mvnw clean package
Testing
Spring Cloud Stream provides the spring-cloud-stream-test-support
dependency to test the Spring Cloud Stream application. Instead of the Kafka binder, it uses the Test
binder to trace and test your application's outbound and inbound messages. The Test
binder uses a utility class called MessageCollector
, which stores the messages in-memory.
To unit test the UsageCostProcessor
, add the following code in the UsageCostProcessorApplicationTests
class:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertTrue;
@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorApplicationTests {
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() throws Exception {
this.processor.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"duration\":101,\"data\":502}").build());
Message message = this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
assertTrue(message.getPayload().toString().equals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}"));
}
}
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageCostProcessor
test case uses theTest
binder'sMessageCollector
to collect the messages from theUsageCostProcessor
object'soutput
.
UsageCostLogger
Sink
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-logger-kafka
. - In the Dependencies text box, type
kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
cloud stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-logger-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the business logic for the sink application. To do so:
- Create a
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostlogger
package with content that resembles UsageCostDetail.java. TheUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostLogger
class in theio.spring.dataflow.sample.usagecostlogger
package to receive theUsageCostDetail
message and log it. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostlogger;
import io.spring.dataflow.sample.UsageCostDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@StreamListener(Sink.INPUT)
public void process(UsageCostDetail usageCostDetail) {
logger.info(usageCostDetail.toString());
}
}
In the preceding application, the @EnableBinding
annotation indicates that you want to bind your application to the messaging middleware. The annotation takes one or more interfaces as a parameter — in this case, the Sink interface that defines the input channel.
The @StreamListener
annotation binds the application's input
channel to the process
method by converting the incoming JSON to a UsageCostDetail
object.
We configure the Kafka topic that is bound to the input channel later.
Configuring the UsageCostLogger
Application
When configuring the consumer
application, we need to set the input
binding destination (a Kafka topic).
In src/main/resources/application.properties
, you can add the following property:
spring.cloud.stream.bindings.input.destination=usage-cost
The spring.cloud.stream.bindings.input.destination
property binds the UsageCostLogger
object's input
to the usage-cost
Kafka topic.
Building
Now we can build the Usage Cost Logger application.
In the usage-cost-logger
directory, run the following command to build the project with Maven:
./mvnw clean package
Testing
To unit test the UsageCostLogger
, add the following code in the UsageCostLoggerApplicationTests
class:
package io.spring.dataflow.sample.usagecostlogger;
import io.spring.dataflow.sample.UsageCostDetail;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageCostLoggerApplicationTests {
@Autowired
protected Sink sink;
@Autowired
protected UsageCostLogger usageCostLogger;
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger() throws Exception {
ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
verify(this.usageCostLogger).process(captor.capture());
}
@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class TestConfig {
// Override `UsageCostLogger` bean for spying.
@Bean
@Primary
public UsageCostLogger usageCostLogger() {
return spy(new UsageCostLogger());
}
}
}
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageCostLogger
test case verifies that theprocess
method ofUsageCostLogger
is invoked by usingMockito
. To do this, theTestConfig
static class overrides the existingUsageCostLogger
bean to create a Mock bean ofUsageCostLogger
. Since we are mocking theUsageCostLogger
bean, theTestConfig
also explicitly annotates@EnableBinding
and@EnableAutoConfiguration
.
Deployment
In this section, we deploy the applications we created earlier to the local machine, to Cloud Foundry, and to Kubernetes.
When you deploy these three applications (UsageDetailSender
, UsageCostProcessor
and UsageCostLogger
), the flow of message is as follows:
UsageDetailSender -> UsageCostProcessor -> UsageCostLogger
The UsageDetailSender
source application's output is connected to the UsageCostProcessor
processor application's input.
The UsageCostProcessor
application's output is connected to the UsageCostLogger
sink application's input.
When these applications run, the Kafka
binder binds the applications' output and input boundaries to the corresponding topics in Kafka.
Local
This section shows how to run the three applications as standalone applications in your local
environment.
If you have not already done so, you must download and set up Kafka
in your local environment.
After unpacking the downloaded archive, you can start the ZooKeeper
and Kafka
servers by running the following commands:
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
Running the Source
By using the pre-defined configuration properties (along with a unique server port) for UsageDetailSender
, you can run the application, as follows:
java -jar target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar --server.port=9001 &
Now you can see the messages being sent to the usage-detail
Kafka topic by using the Kafka console consumer, as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-detail
To list the topics, run the following command:
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
Running the Processor
By using the pre-defined configuration properties(along with a unique server port) for UsageCostProcessor
, you can run the application, as follows:
java -jar target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar --server.port=9002 &
With the UsageDetail
data in the usage-detail
Kafka topic from the UsageDetailSender
source application, you can see the UsageCostDetail
from the usage-cost
Kafka topic, as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-cost
Running the Sink
By using the pre-defined configuration properties (along with a unique server port) for UsageCostLogger
, you can run the application, as follows:
java -jar target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar --server.port=9003 &
Now you can see that this application logs the usage cost detail.
Cloud Foundry
This section walks you through how to deploy the UsageDetailSender
, UsageCostProcessor
, and UsageCostLogger
applications on CloudFoundry.
Create a CF Manifest for the UsageDetail
Sender
You need to create a CF manifest YAML file called usage-detail-sender.yml
for the UsageDetailSender
to define its configuration properties, as follows
applications:
- name: usage-detail-sender
timeout: 120
path: ./target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageDetailSender
application by using its manifest YAML file, as follows:
cf push -f usage-detail-sender.yml
You need to create a CF manifest YAML file called usage-cost-processor.yml
for the UsageCostProcessor
to define its configuration properties, as follows
applications:
- name: usage-cost-processor
timeout: 120
path: ./target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageCostProcessor
application by using its manifest YAML file, as follows:
cf push -f usage-cost-processor.yml
You need to create a CF manifest YAML file called usage-cost-logger.yml
for the UsageCostLogger
to define its configuration properties, as follows:
applications:
- name: usage-cost-logger
timeout: 120
path: ./target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageCostLogger
application by using its manifest YAML file, as follows:
cf push -f usage-cost-logger.yml
You can see the applications by running the cf apps
command, as the folowing example (with output) shows:
cf apps
name requested state instances memory disk urls
usage-cost-logger started 1/1 1G 1G usage-cost-logger.cfapps.io
usage-cost-processor started 1/1 1G 1G usage-cost-processor.cfapps.io
usage-detail-sender started 1/1 1G 1G usage-detail-sender.cfapps.io
2019-05-13T23:23:33.36+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:33.362 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user5", "callCost": "1.0", "dataCost": "12.350000000000001" }
2019-05-13T23:23:33.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:33.467 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user1", "callCost": "19.0", "dataCost": "10.0" }
2019-05-13T23:23:34.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:34.466 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user4", "callCost": "2.2", "dataCost": "5.15" }
2019-05-13T23:23:35.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:35.469 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user3", "callCost": "21.0", "dataCost": "17.3" }
Kubernetes
This section walks you through how to deploy the three Spring Cloud Stream applications on Kubernetes.
Setting up the Kubernetes Cluster
For this we need a running Kubernetes cluster. For this example we will deploy to minikube
.
Verifying Minikube is Running
To verify that Minikube is running, run the following command (shown with typical output if Minikube is running):
$minikube status
host: Running
kubelet: Running
apiserver: Running
kubectl: Correctly Configured: pointing to minikube-vm at 192.168.99.100
Installing Apache Kafka
Now we can install the Kafka message broker by using the default configuration from Spring Cloud Data Flow. To do so, run the following command:
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.3.1.RELEASE/src/kubernetes/kafka/kafka-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.3.1.RELEASE/src/kubernetes/kafka/kafka-svc.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.3.1.RELEASE/src/kubernetes/kafka/kafka-zk-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.3.1.RELEASE/src/kubernetes/kafka/kafka-zk-svc.yaml
Building Docker Images
To build Docker images, we use the jib Maven plugin.
If you downloaded the source distribution, the jib plugin is already configured.
If you built the apps from scratch, add the following under plugins
in each pom.xml
file:
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.10.1</version>
<configuration>
<from>
<image>springcloud/openjdk</image>
</from>
<to>
<image>${docker.org}/${project.artifactId}:${docker.version}</image>
</to>
<container>
<useCurrentTimestamp>true</useCurrentTimestamp>
</container>
</configuration>
</plugin>
Then add the following properties under the properties
section of each pom.xml
file. For this example, we use the following properties:
<docker.org>springcloudstream</docker.org>
<docker.version>${project.version}</docker.version>
Now you can run the Maven build to create the Docker images in the minikube
Docker registry. To do so, run the following commands:
$ eval $(minikube docker-env)
$./mvnw package jib:dockerBuild
If you downloaded the project source, the project includes a parent pom to let you build all the modules with a single command.
Otherwise, run the build for the source, processor, and sink individually.
You need only run eval $(minikube docker-env)
once for each terminal session.
Deploying the Stream
To deploy the stream, you must first copy and paste the following YAML and save it to usage-cost-stream.yaml
:
kind: Pod
apiVersion: v1
metadata:
name: usage-detail-sender
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-detail-sender
image: springcloudstream/usage-detail-sender-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: user-details
- name: SERVER_PORT
value: '80'
restartPolicy: Always
---
kind: Pod
apiVersion: v1
metadata:
name: usage-cost-processor
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-cost-processor
image: springcloudstream/usage-cost-processor-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP
value: usage-cost-stream
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: user-details
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: user-cost
- name: SERVER_PORT
value: '80'
restartPolicy: Always
---
kind: Pod
apiVersion: v1
metadata:
name: usage-cost-logger
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-cost-logger
image: springcloudstream/usage-cost-logger-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP
value: usage-cost-stream
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: user-cost
- name: SERVER_PORT
value: '80'
restartPolicy: Always
Then you need to deploy the apps, by running the following command:
kubectl apply -f usage-cost-stream.yaml
If all is well, you should see the following output:
pod/usage-detail-sender created
pod/usage-cost-processor created
pod/usage-cost-logger created
The preceding YAML specifies three pod resources, for the source, processor, and sink applications. Each pod has a single container that references the corresponding docker image.
We set the Kafka binding parameters as environment variables.
The input and output destination names have to be correct to wire the stream. Specifically, the output of the source must be the same as the input of the processor, and the output of the processor must be the same as the input of the sink.
We also set the logical hostname for the Kafka broker so that each application can connect to it.
Here we use the Kafka service name — kafka
, in this case.
We set the app: user-cost-stream
label to logically group our apps.
We set the Spring Cloud Stream binding parameters by using environment variables. The input and output destination names have to be correct to wire the stream. Specifically, the output of the source must be the same as the input of the processor, and the output of the processor must be the same as the input of the sink. We set the inputs and outputs as follows:
- Usage Detail Sender:
SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=user-details
- Usage Cost Processor:
SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=user-details
andSPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=user-cost
- Usage Cost Logger:
SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=user-cost
Verifying the Deployment
You can use the following command to tail the log for the usage-cost-logger
sink:
kubectl logs -f usage-cost-logger
You should see messages similar to the following messages:
2019-05-02 15:48:18.550 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "21.1", "dataCost": "26.05" }
2019-05-02 15:48:19.553 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "4.2", "dataCost": "15.75" }
2019-05-02 15:48:20.549 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "28.400000000000002", "dataCost": "15.0" }
2019-05-02 15:48:21.553 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "16.8", "dataCost": "28.5" }
2019-05-02 15:48:22.551 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "22.700000000000003", "dataCost": "20.3" }
2019-05-02 15:48:23.556 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "16.6", "dataCost": "2.6" }
2019-05-02 15:48:24.557 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "6.7", "dataCost": "1.0" }
2019-05-02 15:48:25.555 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Glenn", "callCost": "3.7", "dataCost": "2.6500000000000004" }
2019-05-02 15:48:26.557 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "24.200000000000003", "dataCost": "32.9" }
2019-05-02 15:48:27.556 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Glenn", "callCost": "19.200000000000003", "dataCost": "7.4" }
2019-05-02 15:48:28.559 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Sabby", "callCost": "17.7", "dataCost": "27.35" }
2019-05-02 15:48:29.562 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "26.8", "dataCost": "32.45" }
2019-05-02 15:48:30.561 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "26.5", "dataCost": "33.300000000000004" }
2019-05-02 15:48:31.562 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Sabby", "callCost": "16.1", "dataCost": "5.0" }
2019-05-02 15:48:32.564 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "16.3", "dataCost": "23.6" }
2019-05-02 15:48:33.567 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "29.400000000000002", "dataCost": "2.1" }
2019-05-02 15:48:34.567 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "5.2", "dataCost": "20.200000000000003" }
Cleaning up
To delete the stream, we can use the label we created earlier. The following command shows how to do so:
kubectl delete pod -l app=usage-cost-stream
To uninstall Kafka, run the following command:
kubectl delete all -l app=kafka
What's Next
The RabiitMQ shows you how to create the same three applications but with RabbitMQ instead. Alternatively, you can use Spring Cloud Data Flow to deploy the three applications, as described in Create and Deploy a Stream Processing Pipeline using Spring Cloud Data Flow.