Programming Models

Spring Cloud Stream provides the flexibility to build the streaming applications by using different programming models.

  • Functional
  • Kafka Streams (when using Kafak Streams binder)

The annotation-based programming model (@EnableBinding / @StreamListener) has been deprecated in Spring Cloud Stream 3.2.x.

In the sections that follow, we review how a single example of business logic can be built with different programming models.

To highlight the use of programming with a concrete example, consider a scenario in which we receive data from an HTTP endpoint. Once the data is available, we want to transform the payload by adding prefix and suffixes. Finally, we want to verify the transformed data.

Out-of-the-Box Source and Sink

To demonstrate the previously mentioned use case, we start by taking a look at the two out-of-the-box applications:

Custom Processor

For the data transformation between the source and sink steps, we highlight a custom processor application and use that as a base to demonstrate different programming models.

Code:

public class FunctionStreamSampleProcessor {

	@Bean
	public Function<String, String> messenger() {
		return data -> "Hello: " + data + "!";
	}
}
public class KafkaStreamsSampleProcessor {

	@Bean
	public Consumer<KStream<String, String>> messenger() {
		return data -> data.map((k, v) -> new KeyValue<>(null, "Hello: " + v + "!"));
	}
}

The business logic in the processor transforms the received payload by adding the "Hello: " prefix and the "!" suffix.

The same "business logic" can be implemented with different programming models, and each of the variations implements a simple messenger function, which can be independently tested and evolved in isolation.

Takeaway: Developers have the choice to choose from the available programming model styles.

Configuration: (application.properties)

spring.cloud.stream.bindings.input.destination=incomingDataTopic
spring.cloud.stream.bindings.output.destination=outgoingDataTopic
spring.cloud.stream.bindings.input.destination=incomingDataTopic
spring.cloud.stream.bindings.output.destination=outgoingDataTopic

spring.cloud.stream.kafka.streams.binder.applicationId=kstreams-sample

In the Kafka Streams configuration, you may have noticed the extra property, spring.cloud.stream.kafka.streams.binder.applicationId, which is required by the framework internally to identify the Kafka Streams application uniquely.

See the Spring Cloud Stream reference docs for more information on the Functional and Kafka Streams programming models.

Composing Functional Beans in Processor Applications

Functional composition support is not applicable for the out-of-the-box Spring Cloud Stream Processor applications, since there is ambiguity in whether the function needs to be applied before or after the existing processor’s application logic. It is hard to determine that.

However, you can create your own processor applications that use functional composition with the standard java.util.Function APIs, as follow:

@Configuration
public static class FunctionProcessorConfiguration {

  @Bean
  public Function<String, String> upperAndConcat() {
  return upper().andThen(concat());
  }

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

When you deploy your stream with the custom processor application, you need to deploy the processor application by defining the following property: spring.cloud.stream.function.definition to compose functional beans.

In this example, it would be set to:

spring.cloud.stream.function.definition=upper|concat