11

How To Use SingleStore With Spark ML for Fraud Detection

 2 years ago
source link: https://dzone.com/articles/how-to-use-singlestore-with-spark-ml-for-fraud-det-2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Abstract

In this final part of our Fraud Detection series, we’ll use Spark to build a Logistic Regression model from data stored in SingleStore.

The notebook files used in this article series are available on GitHub in DBC, HTML, and iPython formats.

Introduction

This is a multi-part article series and it is structured as follows:

  1. Configure Databricks CE.
  2. Load the Credit Card data into SingleStore.
  3. Create and evaluate a Logistic Regression model.

In the first part of this Fraud Detection series, we created and configured a Databricks CE cluster. In the second part, we loaded credit card data into our Spark environment. This third article covers Part 3, Create and evaluate a Logistic Regression model. If you are following along with this series, please ensure that you have successfully completed the setup and requirements described in the two previous articles.

According to Andrea Dal Pozzolo, who was involved in the collection of the original dataset we are using for this example use case, fraud detection is a classification problem. Also, since investigators may only review a limited number of transactions, the probability that a transaction is fraudulent is more important than the true classification. Therefore, a good algorithm to use for the initial analysis is Logistic Regression. This is because the outcome has only two possible values.

Fill Out the Notebook

Let’s now create a new notebook. We’ll call it Fraud Detection using Logistic Regression. We’ll attach our new notebook to our Spark cluster.

In the first code cell, let’s add the following:

Python
%run ./Setup

We can then execute the notebook that we previously created. We need to ensure that the server address and password have been added for our SingleStore Managed Service cluster.

In the next code cell we’ll set some parameters for the SingleStore Spark Connector, as follows:

Python
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

These are parameters for the SingleStore cluster, username, password, and whether Pushdown is enabled or disabled. We’ll discuss Pushdown in a separate article.

In the next code cell, let’s read the data from the SingleStore table into a Spark Dataframe, as follows:

Python
%%time

df = (spark.read
      .format("singlestore")
      .load("fraud_detection.credit_card_tx"))

Using the %%time in the code cell allows us to measure the read operation. It should take just milliseconds to complete.

In the next code cell, we’ll get the number of rows:

Python
df.count()

This value should match the result we obtained in the previous article. In the next code cell, we’ll drop any null values and then count the number of rows again, as follows:

Python
df = df.dropna()
df.count()

The result should show that there are no null values.

As previously mentioned, the dataset is highly skewed. There are a number of solutions we can use to manage a skewed dataset. The initial approach we can take is to under-sample. We'll keep all the 492 fraudulent transactions and reduce the number of non-fraudulent transactions. There are several ways we could perform this dataset reduction:

  • Randomly select majority class examples.
  • Select every nth row from the majority class examples.

For our initial analysis, let’s use the first approach and select 1% of the majority class examples.

First, we’ll separate the two possible outcomes into two Dataframes in a code cell, as follows:

Python
is_fraud = df.select("*").filter("Class == 1")
no_fraud = df.select("*").filter("Class == 0")

In the next code cell, we’ll randomly sample 1% of non-fraudulent transactions, without replacement, as follows:

Python
no_fraud = no_fraud.sample(False, 0.01, seed = 123)

In the next code cell, we’ll concatenate the two Dataframes, sort on the Time column, and print out the number of rows:

Python
df_concat = no_fraud.union(is_fraud)
df = df_concat.sort("Time")
df.count()

In the next code cell, we’ll check the structure of the Dataframe:

Python
display(df)

Next, in a code cell we’ll create our train-test split:

Python
train, test = df.randomSplit([0.7, 0.3], seed = 123)
print("train =", train.count(), " test =", test.count())

Here we are using 70% and 30% for train and test, respectively.

The code in the following sections was inspired by a Logistic Regression example, available on GitHub. In the next code cell, we’ll generate an is_fraud label column for the training data, using a UDF, as follows:

Python
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

is_fraud = udf(lambda fraud: 1.0 if fraud > 0 else 0.0, DoubleType())
train = train.withColumn("is_fraud", is_fraud(train.Class))

We are now ready to create and fit a Spark Machine Learning model:

Python
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Create the feature vectors.
assembler = VectorAssembler(
  inputCols = [x for x in train.columns if x not in ["Time", "Class", "is_fraud"]],
  outputCol = "features")

# Use Logistic Regression.
lr = LogisticRegression().setParams(
    maxIter = 100000,
    labelCol = "is_fraud",
    predictionCol = "prediction")

model = Pipeline(stages = [assembler, lr]).fit(train)

For the VectorAsembler, we want to use the columns V1 to V28 as well as the Amount of the transaction. Therefore, we ignore the Time, Class, and is_fraud columns. Using Logistic Regression, we create our model.

Next, we’ll predict whether a transaction is fraudulent or not, using the test data, as follows:

Python
predicted = model.transform(test)

And show the predictions as follows:

Python
display(predicted)

Finally, we’ll check the performance of our model using a confusion matrix:

Python
predicted = predicted.withColumn("is_fraud", is_fraud(predicted.Class))
predicted.crosstab("is_fraud", "prediction").show()

Overall, the results should show that our initial model makes good predictions. Because Data Science and Machine Learning are iterative processes, we can look for ways to improve and tune our classifier. For example, normalizing the data could be very useful and something to explore in the next iteration.

Summary

In this article series, we have seen how easily SingleStore can be used with Spark. The key benefits of the SingleStore Spark Connector can be summarised as follows:

  • Implemented as a native Spark SQL plugin.
  • Accelerates ingest from Spark via compression.
  • Supports data loading and extraction from database tables and Spark Dataframes.
  • Integrates with the Catalyst query optimizer and supports robust SQL Pushdown.
  • Accelerates ML workloads.

In a future article, we'll explore External Functions and discuss how they could be useful for Machine Learning.

In the next article series, we’ll look at an example of Pushdown. Stay tuned!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK