Back to Blog

Streaming pipeline using Kafka

Introduction

Recently I started working in a Data Engineering project where I was assigned the task of learning about Apache Kafka and setting up an infrastructure to consume different feeds in real time. This was an amazing challenge to my skills, mainly because I’m not very experienced but also because my background is more related to Machine Learning than Data. So, as soon as I had set up everything, I was eager to show a bit on how I could manage to create a pipeline using Kafka, and maybe, be helpful to others that may go through for the same process in the future.

The idea is to show how to implement an end-to-end pipeline using Python and Amazon Managed Streaming for Apache Kafka (MSK)

Overall, streaming data empowers machine learning applications to operate in real-time, handle massive volumes of data, adapt to dynamic environments, detect anomalies, continually learn and improve, and provide personalized experiences. Its integration with machine learning enables the development of intelligent systems capable of making timely, data-driven decisions in today's fast-paced world.

So, what 's Kafka?

The best way to introduce it is by citing the Confluent web page.

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.[...] Kafka provides a system with high throughput, scalability, low latency, storage and availability, very useful in the world that we are living on, as real streaming data is fundamental today, for example, many apps that we use everyday use Kafka or have used Kafka, like LinkedIn, Uber, Spotify, and others.

Kafka provides three main functions to its users:

1. To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.

2. To store streams of events durably and reliably for as long as you want.

3. To process streams of events as they occur or retrospectively.

Now that we have an understanding of Kafka, let's delve into the pipeline we aim to establish. The primary objective of the pipeline is to consume data from a real-time streaming feed. Although we won't be building a specific application using this data in this article, you can utilize the pipeline to send data to a database, train machine learning models, or transform it for ETL processes.

Pipeline that we are going to set up.
  1. Stream API: We will develop a custom API using the Python Flask library, which will be hosted on an EC2 instance in AWS. This API will serve as the data source for our consumption.
  2. Kafka Producer: Responsible for consuming the data from the stream API and sending it to the cluster, we will create a straightforward Python script that acts as a producer for the Kafka cluster. This script will run on an EC2 instance within the AWS environment.
  3. Kafka Cluster: The Kafka cluster serves as the storage and computational backbone for our event-driven platform. It will consist of multiple brokers and partitions designed to support our publish/subscribe system. This cluster will be hosted on AWS using the Managed Streaming for Apache Kafka (MSK) service.
  4. Kafka Consumer: As an illustration of data consumption from the cluster, we will implement a Kafka Consumer API on another EC2 instance. This consumer will demonstrate how data can be retrieved and processed from the Kafka cluster.

MSK setup

To set up MSK, we will follow the tutorial provided by AWS, making a few modifications for our purposes.

Step 1. Create an Amazon MSK cluster

In the "Create a cluster" section, I suggest:

  • Decreasing the EBS storage to 20 GB.
  • Opting for the "kafka.t3.small" broker type.
    These modifications aim to maximize the utilization of cost-efficient resources.

Additionally, in the "Security settings" section: 

  • Choose "Plaintext" for access control methods and encryption.
    While this configuration is not recommended for production clusters, it suffices for testing our pipeline and simplifies the setup process. It's worth noting that we will be using the Confluent Kafka Python library for producing messages, which does not support IAM role-based authentication for the Kafka cluster.
Step 2. Create an IAM role
  • The section on creating an IAM role will be omitted since, as mentioned earlier, we won't be utilizing this authentication method.
Step 3. Create a client machine
  • No specific variations or changes are recommended for setting up the client machine.
Step 4. Create a topic
  • To create a test topic, follow the fourth step with a slight modification.
  • Instead of creating a client.properties file as described in the tutorial, simply save a file with the following property: security.protocol=PLAINTEXT.

Finally, in the next part, we will produce and consume messages from this topic.

Stream API

Having a Stream API is crucial in modern data-driven environments for several reasons. 

  • Enables real-time data processing, allowing organizations to extract insights and take immediate actions as data streams in. This real-time capability is invaluable in industries such as finance, e-commerce, and IoT, where timely decision-making is critical. 
  • Empowers the implementation of event-driven architectures, where events trigger actions and workflows. This enables greater flexibility, responsiveness, and scalability in system design. 
  • Enables the integration of streaming data with machine learning and AI algorithms, allowing organizations to build intelligent systems that learn, adapt, and make predictions in real time. 
  • Plays a vital role in anomaly detection, monitoring, and personalized experiences by analyzing data as it arrives, identifying patterns, outliers, and deviations from normal behavior. 

Overall, having a Stream API empowers organizations to leverage real-time data, make data-driven decisions, and gain a competitive advantage in today's fast-paced and dynamic business landscape.

To simulate a real-time streaming API we are going to use the Flask library in python. Code snippet below:

  
    # Define a generator function that continuously yields data
    def generate_data():
        start_time = time.time()
        counter = 0
        try:
            while app_running:
                timestamp_difference = int(time.time() - start_time)
                data = {'counter': counter,
                        'timestamp': timestamp_difference}
                data_queue.append(data)
                counter += 1
                # Adjust the delay between each data update 
                # as per your needs
                time.sleep(1/events_per_second)  
        except Exception as e:
            print("An error occurred during data generation:", str(e))

    # Define a route to stream data from the queue
    @app.route('/stream')
    def stream():
        def generate():
            for data in data_queue:
                yield json.dumps(data) + '\n'

            while app_running:
                # Adjust the delay to control the frequency 
                # of checking for new data
                time.sleep(0.1)  
                if len(data_queue) > 0:
                    for data in data_queue:
                        yield json.dumps(data) + '\n'
                    data_queue.clear()

        return Response(generate(), mimetype='text/event-stream')
	

The Flask application is set up with an API endpoint that can be accessed to retrieve the streaming data. The data generation is decoupled from the API endpoint, allowing it to generate data independently in the background.

The generate_data() function is responsible for continuously generating data. It uses a timestamp to track the time difference in seconds since the start of data generation. Each data point consists of a counter and the timestamp. The generated data is stored in a queue for efficient retrieval.

The API endpoint /stream is defined to stream the data from the queue. When a client accesses this endpoint, the streaming response is initiated. Initially, it yields any existing data from the queue. Then, it enters a loop where it periodically checks for new data. If new data is available, it yields the new data and clears the queue. This approach ensures that both existing and real-time data can be consumed by the client.

The real-time streaming API code is now complete, providing the capability to generate and stream data. To utilize this streaming functionality effectively, we need to ingest the generated data into a Kafka cluster. This involves setting up a Kafka producer, which will consume the data from the generator and publish it to the Kafka cluster. By integrating the Kafka producer into our streaming API code, we establish a seamless data flow from the generator to the Kafka infrastructure, enabling real-time data ingestion for further processing and analysis.

Kafka Producer

In the following code snippet, we will utilize the confluent_kafka library, as mentioned earlier, to establish a connection with a Kafka cluster and set up a Kafka producer. The producer will play a crucial role in ingesting the data generated by the streaming API and sending it to the Kafka cluster for further processing.

  
    def main():
        # Replace STREAM_ENDPOINT with the URL of the streaming endpoint
        STREAM_ENDPOINT = "http://127.0.0.1:5000/stream"

        # Make a GET request to the streaming endpoint 
        # with stream=True parameter to enable streaming
        config = json.loads((os.getenv("PRODUCER_CFG")))
        kafka_producer = Producer(config)

        response = requests.get(STREAM_ENDPOINT, stream=True)
        # Iterate over each line of the streaming response
        for line in response.iter_lines(chunk_size=512):

            # Skip empty lines
            if not line:
                continue

            kafka_producer.produce(topic="msk-demo-topic",
                                   value=line,
                                   callback=delivery_report)
            kafka_producer.poll(0)
        kafka_producer.flush()
	

The main function initiates a GET request to the streaming API endpoint, enabling the streaming functionality. As the streaming response is received, the code iterates over each line of data and sends it to the Kafka cluster using the Kafka producer. The producer is configured with the necessary parameters retrieved from environment variables.

Kafka Consumer

To consume data from the Kafka cluster, we can utilize the Kafka Consumer API directly from the command-line interface (CLI). 

To initiate the consumer, you can use the following command on the Kafka installation folder:

../bin/kafka-console-consumer.sh --bootstrap-server <bootstrap.servers> --consumer.config client.properties --topic msk-demo-topic --from-beginning

  1. Obtain the appropriate <bootstrap.servers> value from the properties of the previously set up MSK Cluster.
  2. Create a valid "client.properties" configuration file, ensuring it includes the necessary settings.
  3. Specify the obtained <bootstrap.servers> value in the configuration file.
  4. Execute the consumption command, making use of the --from-beginning option to display all messages, including those produced earlier.

Executing this command will retrieve and display all the messages that have been produced and stored within the Kafka cluster, enabling you to review the data and perform any necessary analysis or processing.

Trying the setup and monitoring

To begin producing data, we will need to copy both the API code and the producer code into your EC2 instance. Run them simultaneously in separate terminals. One terminal will show the API running, while the other can be used to add print statements for logging purposes. This allows us to monitor the code and ensure everything is running smoothly.

Within the producer code, you can enhance the delivery report function by implementing the else clause. This allows you to print a message for each successful event. Be aware that this may result in a flood of logs, but it can be useful for verifying that everything is functioning correctly.

  1. Create a .env file that includes a PRODUCER_CFG parameter.
  2. Inside the PRODUCER_CFG parameter, define a dictionary containing the required configuration details.
  3. Include the necessary parameters, such as "security.protocol" and "bootstrap.servers," using the same values as before.

These configuration details will enable the producer to establish a secure connection with the Kafka cluster.

By following these steps, we can successfully set up and run the API, the producer and the consumer code on our EC2 instance. Monitoring the logs and verifying the successful delivery of events will provide reassurance that the process is functioning as expected.

API code running in an EC2 instance

In the API image, we can observe the running code of api.py and the requests being sent to the /stream endpoint. When it comes to the producer, no visible output will be displayed unless there is an error with the events being processed. On the other hand, in the consumer, we will be able to see all the events that were produced and stored in the Kafka cluster.

Producer code using confluent kafka library
Consumer code logging the events being consumed

To verify the successful transmission of data to the Kafka cluster, we can leverage Cloudwatch Dashboards for monitoring purposes. By creating a dashboard, we gain access to a variety of metrics that allow us to track different aspects of our topics, brokers, consumers, connectors, and more.

Cloudwatch Dashboard to monitor MSK cluster

Among the useful metrics available, two stand out: BytesInPerSec and BytesOutPerSec. These metrics provide insights into the data flow, enabling us to assess the health of both the producer and consumer components.

For instance, by monitoring BytesInPerSec, we can confirm whether the producer is functioning correctly and sending data to the cluster as expected. Similarly, monitoring BytesOutPerSec helps us determine if the consumers are processing the data effectively.

The provided image serves as an example, showcasing the visualization of BytesInPerSec, ByterOutPerSec and EventsInPerSec with a six broker cluster. By leveraging Cloudwatch Dashboards and monitoring the appropriate metrics, we can gain valuable insights into the performance and behavior of your Kafka ecosystem.

Conclusion

The pipeline we created enables the consumption of real-time streaming data from a variety of sources. Although we did not build a specific application in this article, the pipeline can be extended to send data to databases, train machine learning models, or perform ETL processes.

Overall, this project has showcased the importance and value of streaming data in the modern data-driven landscape. Streaming data enables organizations to leverage real-time insights, make data-driven decisions, and gain a competitive edge. By harnessing the power of Apache Kafka and the capabilities of the pipeline we established, organizations can build intelligent systems that adapt, learn, and process data in real time to drive innovation and success.

Related posts