5

Using SingleStore Pipelines With Kafka, Part 3 - DZone Big Data

 2 years ago
source link: https://dzone.com/articles/how-to-use-singlestore-pipelines-with-kafka-part-3
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Abstract

This article is the third and final part of our Pipelines series. We'll look at replacing the Consumer part of our Producer-Consumer application by using a compelling feature of SingleStore, called Pipelines.

The SQL scripts, Java code, and notebook files used in this article series are available on GitHub. The notebook files are available in DBC, HTML, and iPython formats.

Introduction

This is a three-part article series, and it is structured as follows:

This third article covers Part 3, Demonstrate SingleStore Pipelines.

SingleStore Pipelines

Pipelines allow us to create streaming ingest feeds from various sources, such as Apache Kafka™, Amazon S3, and HDFS, using a single command. With Pipelines, we can perform ETL operations:

  1. Extract. Pull data from various sources without the need for additional middleware.
  2. Transform. Map and enrich data using transformations.
  3. Load. Guarantee message delivery and eliminate duplicates.

Visually, Figure 1 shows our SingleStore Pipelines architecture.

15555250-1642518870835.png

Figure 1. SingleStore and Kafka using Pipelines.

For our use case, we can create a simple Pipeline in SingleStore as follows:

USE sensor_readings;

CREATE PIPELINE IF NOT EXISTS kafka_confluent_cloud AS
LOAD DATA KAFKA '{{ BROKER_ENDPOINT }}/temp'
CONFIG '{
   "security.protocol" : "SASL_SSL",
   "sasl.mechanism" : "PLAIN",
   "sasl.username" : "{{ CLUSTER_API_KEY }}"}'
CREDENTIALS '{
   "sasl.password" : "{{ CLUSTER_API_SECRET }}"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE temperatures
FORMAT CSV
FIELDS TERMINATED BY ',';

We need to add values for {{ BROKER_ENDPOINT }} , {{ CLUSTER_API_KEY }} and {{ CLUSTER_API_SECRET }}. We covered how to find these in the previous article in this series.

We specify the temp topic name on line 2 of the Pipeline code. We also specify that we are using CSV format and that the comma character separates the fields.

Next, we need to start the Pipeline. This can be done as follows:

START PIPELINE kafka_confluent_cloud;

We can check the Pipeline using:

SHOW PIPELINES;

The Pipeline we defined and started will ingest the message data directly from Confluent Cloud into our SingleStore database. As we can see, the architecture has been simplified compared to the approach using JDBC.

Example Queries

Now that we have built our system, we can start to ask queries, such as finding sensors where the temperature reading is within a specific range:

USE sensor_readings;

SELECT sensorid, COUNT(*)
FROM temperatures
WHERE temp > 70 AND temp < 100
GROUP BY sensorid
ORDER BY sensorid;

Or within the particular latitude and longitude coordinates:

USE sensor_readings;

SELECT MAX(temp) AS max_temp, sensorid
FROM temperatures AS t
JOIN sensors AS s ON t.sensorid = s.id
WHERE s.latitude >= 24.7433195 AND s.latitude <= 49.3457868 AND
      s.longitude >= -124.7844079 AND s.longitude <= -66.9513812
GROUP BY sensorid
ORDER BY max_temp DESC;

We can also use the geospatial features of SingleStore to find the landmasses where sensors are located:

USE sensor_readings;

SELECT continents.name AS continent, sensors.name AS sensor_name
FROM continents
JOIN sensors
ON GEOGRAPHY_CONTAINS(continents.geo, sensors.location)
ORDER BY continents.name, sensors.name;

Summary

Pipelines are a compelling feature of SingleStore. We have only implemented a small example but have immediately realized the benefits of a simplified architecture. The key benefits of Pipelines include:

  • Rapid parallel loading of data into a database.
  • Live de-duplication for real-time data cleansing.
  • Simplified architecture that eliminates the need for additional middleware.
  • Extensible plugin framework that allows customizations.
  • Exactly once semantics, critical for enterprise data.

More details about Pipelines can be found on the SingleStore Documentation website.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK