Create and Deploy a Python Application

This recipe shows how to deploy a Python script as a Data Flow application. Unlike the other applications types (source, processor, or sink), Data Flow does not set deployment properties that wire up producers and consumers when deploying the app application type. It is the developer’s responsibility to "wire up" the multiple applications when deploying, in order for them to communicate by using deployment properties.

The recipe creates a data-processing pipeline that dispatches input stream of timestamps to either even or odd downstream channels. Technically, the recipe implements the Dynamic Router integration pattern. The pipeline takes timestamps messages from an timeDest input channel. Depending on the timestamp value, it routes the message to one of the dedicated evenDest or oddDest downstream channels.

The following diagram shows the architecture of the data-processing pipelines:

SCDF Python Tasks

As a timestamp source, the application uses the prebuilt Time Source application but registers it as the Data Flow App type. It continuously emits timestamps to a downstream Kafka topic called timeDest.

The Router app, implemented by the Python script and packaged as a Docker image, consumes the incoming timestamps from the timeDest Kafka topic and, according to the timestamp value, routes the messages downstream to either the evenDest Kafka topic or the oddDest Kafka topic.

The Even Logger and Odd Logger components are prebuilt Log Sink applications but are registered as Data Flow App type. Loggers consume the evenDest or oddDest topics and print the incoming message in on the console.

Apache Kafka is used as the messaging middleware.

Development

You can find the source code in the samples GitHub repository and download it as a zipped archive: polyglot-python-app.zip.

The pythonrouterapp.py implements the timestamp router application logic:

from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic

class Router:

    def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):

        self.kafka_brokers = kafka_brokers
        self.input_topic = input_topic
        self.even_topic = even_topic
        self.odd_topic = odd_topic

        # Serve the liveliness and readiness probes via http server in a separate thread.
        Actuator.start(port=8080, info=info)

        # Ensure the output topics exist.
        self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])

        self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
        self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)

    def __create_topics_if_missing(self, topic_names):
        admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
        for topic in topic_names:
            try:
                new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
                admin_client.create_topics(new_topics=[new_topic], validate_only=False)
            except TopicAlreadyExistsError:
                print ('Topic: {} already exists!')

    def process_timestamps(self):
        while True:
            for message in self.consumer:
                if message.value is not None:
                    if self.is_even_timestamp(message.value):
                        self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
                    else:
                        self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)

    @staticmethod
    def is_even_timestamp(value):
        return int(value[-1:]) % 2 == 0


Router(
    get_env_info(),
    get_kafka_brokers(),
    get_channel_topic('input'),
    get_channel_topic('even'),
    get_channel_topic('odd')
).process_timestamps()

If the print command is used inside the Python script, the output buffer must be flushed with sys.stdout.flush() to prevent it being filled up and causing disruption to the Kafka’s consumer-producer flow.

  • The kafka-python library is used to consume and produce Kafka messages. The process_timestamps method continuously consumes timestamps from the input channel and routes the even or odd values to the output channels.
  • The Actuator class inside the actuator.py utility is used to expose operational information about the running application, such as health, liveness, info, and so on. It runs an embedded HTTP server in a separate thread and exposes the /actuator/health and /actuator/info entry points to handle the Kubernetes liveness and readiness probes requests.
  • The arguments.py utility helps to retrieve the required input parameters from the command line arguments and environment variables. The utility assumes the default (that is, exec) entry point style. Note that Data Flow passes the Kafka broker connection properties as environment variables.

For the python_router_app.py to act as a Data Flow app, it needs to be bundled in a Docker image and uploaded to DockerHub. The following Dockerfile shows how to bundle a Python script into a Docker image:

FROM python:3.7.3-slim
RUN pip install kafka-python
RUN pip install flask
ADD /util/* /util/
ADD python_router_app.py /
ENTRYPOINT ["python","/python_router_app.py"]
CMD []

The Dockerfile installs the required dependencies, adds the python script (ADD python_router_app.py) and utilities (under the util folder), and sets the command entry.

Build

We now build the Docker image and push it to the DockerHub registry.

Checkout the sample project and navigate to the polyglot-python-app folder:

git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-app/

From within the polyglot-python-app, build and push the polyglot-python-app Docker image to DockerHub:

docker build -t springcloud/polyglot-python-app:0.2 .
docker push springcloud/polyglot-python-app:0.2

Replace springcloud with your docker hub prefix.

Once published in Docker Hub, the image can be registered in Data Flow and deployed.

Deployment

Follow the installation instructions to set up Data Flow on Kubernetes.

Retrieve the Data Flow URL from minikube (minikube service --url scdf-server) and configure your Data Flow shell:

dataflow config server --uri http://192.168.99.100:30868

Import the SCDF time and log app starters and register the polyglot-python-app as python-router of type app:

app register --name time --type app --uri docker:springcloudstream/time-source-kafka:2.1.0.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:time-source-kafka:jar:metadata:2.1.0.RELEASE

app register --name log --type app --uri docker:springcloudstream/log-sink-kafka:2.1.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:log-sink-kafka:jar:metadata:2.1.1.RELEASE

app register --type app --name python-router --uri docker://springcloud/polyglot-python-app:0.2

The docker://springcloud/polyglot-python-app:0.2 is resolved from the DockerHub repository.

Create the timestamp routing Stream pipeline:

stream create --name timeStampStream --definition "time || python-router || evenLogger: log || oddLogger: log"

The stream definitions shown earlier make use of the label feature in the DSL.

As a result, the following stream pipeline is created:

timeStampStream un-deployed

The time, log, and python-router apps are registered as App type applications and, therefore, can have multiple input and output bindings (that is, channels). Data Flow does not make any assumptions about the flow of data from one application to another. It is the developer’s responsibility to "wire up" the multiple applications when deploying in order for them to communicate.

Keeping this in mind, we deploy the timestamp stream pipeline with the deployment properties in the polyglot-python-app-deployment.properties file:

stream deploy --name timeStampStream --propertiesFile <polyglot-python-app folder>/polyglot-python-app-deployment.properties

The deployment properties defines the Kafka topics used to wire the time, python-router, and logger applications:

app.time.spring.cloud.stream.bindings.output.destination=timeDest

app.python-router.spring.cloud.stream.bindings.input.destination=timeDest
app.python-router.spring.cloud.stream.bindings.even.destination=evenDest
app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest

app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest
app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest

The app.python-router.xxx prefix is a Data Flow convention to map the properties specified after the prefix to the python-router app in the timeStampStream stream.

The timestamp channel is bound to the timeDest Kafka topic. The router's even output channel is bound to the evenDest topic, and the odd channel is bound to the oddDest topic. After the deployment, the data flow looks like this:

timeStampStream deployed

  • Use the kubectl get all command to list the statuses of the deployed k8s containers. Use kubectl logs -f xxx to observe the even and odd pipeline output.

    For example, the kubectl logs -f po/timestampstream-evenlogger-xxx should output:

    2019-05-17 17:56:36.241  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:36
    2019-05-17 17:56:38.301  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:38
    2019-05-17 17:56:40.402  INFO 1 --- log-sink   : Even timestamp:05/17/19 17:56:40
    ...

    The kubectl logs -f po/timestampstream-oddlogger-xxx should output:

    2019-05-17 17:56:37.447  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:37
    2019-05-17 17:56:39.358  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:39
    2019-05-17 17:56:41.452  INFO 1 --- log-sink   : Odd timestamp:05/17/19 17:56:41
    ...