Getting started with KSQL for Kafka stream processing

Updated: January 30, 2024 By: Guest Contributor Post a comment

Introduction

Kafka Streams is a powerful tool for building real-time data processing pipelines and applications. With the rise of streaming data and the need to process it on the fly, Kafka and its stream processing capabilities have become integral to modern data architecture. KSQL is an SQL-like language for Kafka Streams that makes it easier to read, write, and manage stream processing jobs atop Kafka’s underlying platform.

In this tutorial, we’ll dive into the basics of KSQL: what it is, how it works, and how you can use it to process and analyze your streaming data in real time. We’ll start from setting up the environment to some essential concepts and finally to advanced stream processing.

Setting Up the KSQL Environment

Before we start writing KSQL queries, you need a running Kafka cluster and a KSQL server. You can download and install Apache Kafka from the official website, which includes KSQL as part of the Confluent Platform. For demonstration purposes, we’ll use the Confluent Platform which includes everything you need.

Once you have Kafka and KSQL up and running, you can interact with KSQL via the command line interface (CLI), REST API, or Confluent’s Control Center UI. We will focus on using the CLI in this tutorial.

Basic KSQL Operations

With your environment ready, let’s write our first KSQL statement. KSQL understands many SQL elements, but it’s built for stream processing, so there are unique attributes.

First, we must create a KSQL Stream. Think of a Stream as a topic with a defined schema. Here’s a simple example:

CREATE STREAM user_actions (
    userid VARCHAR,
    action VARCHAR
) WITH (
    KAFKA_TOPIC = 'user_actions_topic',
    VALUE_FORMAT = 'JSON'
);

This command creates a Stream called user_actions that reads from the user_actions_topic Kafka topic. You’ll need to ensure that this topic exists in your Kafka cluster before you can create a Stream upon it.

Once the Stream is created, you can start writing queries against it. Here’s how you can formulate a simple select query:

SELECT userid, action FROM user_actions EMIT CHANGES;

This query selects all columns from the user_actions stream and emits changes as they arrive.

Connecting Streams to Databases

KSQL also provides mechanisms to sink data from Kafka topics to external databases. The following example sets up a connection to an external mySQL database:

CREATE SINK CONNECTOR MY_SINK CONNECTOR WITH (
    'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url' = 'jdbc:mysql://localhost:3306/my_db',
    'topics' = 'user_actions_topic',
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://localhost:8081'
);

After running this connector, your Kafka data will start flowing into your mySQL database allowing for easier analysis and integration with other systems.

Stream Processing with KSQL

KSQL excels at processing data in real-time. You can create complex pipelines that transform, aggregate, and join streams of data. Here’s an example where we combine multiple streams to create a new enriched stream:

CREATE STREAM enriched_user_actions AS
SELECT u.userid, u.action, d.device_name
FROM user_actions u
LEFT JOIN device_info d ON u.userid = d.user_id
EMIT CHANGES;

This query joins the user_actions stream with a device_info stream where the userid matches the user_id, creating a new stream with additional device information.

Windowed Aggregations

One of the advanced features in KSQL is the ability to perform windowed aggregations. This allows you to, for example, count events within a certain time frame. Have a look at the following query:

SELECT userid, COUNT(*)
FROM user_actions
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY userid
EMIT CHANGES;

This statement counts the number of actions per user, grouped into hourly intervals. Windowed aggregations are a powerful feature for temporal analysis of your data.

Advanced Stream to Stream Joins

As your KSQL fluency grows, you might need to perform more complex operations, such as stream-stream joins. Here’s an example:

CREATE STREAM vip_actions AS
SELECT ua.userid, ua.action
FROM user_actions ua
INNER JOIN vip_users vu ON ua.userid = vu.userid
EMIT CHANGES;

This statement will output a new stream of actions performed by VIP users by joining the user_actions stream with a vip_users stream based on the userid.

Handling Late Data

One challenge in stream processing is dealing with late-arriving data. KSQL offers mechanisms to handle such scenarios:

SELECT userid, action
FROM user_actions
WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE)
EMIT CHANGES;

In this query, the GRACE PERIOD specifies how much later data can be seen as part of the current window. Data arriving after this grace period won’t be included in the window’s results.

Conclusion

KSQL therefore provides a robust SQL-like language that eases the complexity of stream processing in Kafka. Its simple syntax, combined with Kafka’s scalability, makes it an essential tool for real-time analytics. In this tutorial, we introduced key concepts and practical examples that can serve as a starting point in your KSQL journey.