How to call Elasticsearch API using SSIS

Introduction

Elasticsearch is a powerful engine that allows you to store, aggregate and, most importantly, search data in a very analytical way. In this tutorial, you will learn how to bulk load data from SQL Server to Elasticsearch with SSIS (part of SQL Server) and ZappySys PowerPack.

The scope of this article will be to show how to import records from SQL Server into Elasticsearch index as JSON documents using Elasticsearch Bulk API. In a nutshell, we will retrieve IDs and names of the products from a SQL table, transform each row into a JSON and lastly, index each JSON in Elasticsearch under record’s corresponding ID. This tutorial is going to be your stepping stone to use any Elasticsearch API as a destination.

We will be using these SSIS components of PowerPack to make things work:

Let’s begin?

Prerequisites

  1. SSIS designer installed. Sometimes it is referred as BIDS or SSDT (download it from Microsoft site).
  2. Basic knowledge of SSIS package development using Microsoft SQL Server Integration Services.
  3. Northwind database deployed on your machine (we will use it to load data from; download it).
  4. Elasticsearch instance up and running.
  5. ZappySys SSIS PowerPack installed.
NOTE: If your ElasticSearch instance is hosted as AWS Managed ElasticSearch Instance then select   <New ZS-OAUTH Connection>. rather than ZS-HTTP (Explained later in this article). When OAuth UI launches select AWS v4 Provider. For more information on calling REST API on AWS check this article

Step-by-Step – Bulk loading data from SQL Server to Elasticsearch with SSIS

Load data from SQL Server database first

In this section you will perform several basic steps to start the package: load data from SQL Server and prepare it for further steps.

  1. Create a new SSIS package and drag a Data Flow Task into the Control Flow from the SSIS Toolbox.

    Dragging and dropping Data Flow Task into Control Flow

  2. Open Data Flow Task and then add OLE DB Source.
  3. Configure OLE DB Source to take data from Products table, Northwind database.
  4. Choose ProductID and ProductName as columns you will use.
Load data to Elasticsearch using SSIS and ZappySys Web API Destination component.

Load data to Elasticsearch using SSIS and ZappySys Web API Destination component.

Transform data into JSON documents

To add data into Elasticsearch firstly we need to prepare it with JSON Generator Transform. It will help us convert table rows into JSON documents.

JSON String to Create Index data for the Bulk API call

  1. Close the window and then drag Derived Column component from the SSIS Toolbox.
  2. Add two new columns and name them  RowHeader and  RowFooter:
    Derived Column configuration to add prefix and header and footer to a JSON

    Derived Column configuration to add prefix and header and footer to a JSON

    In the Expression column give them values:

    • RowHeader:
    • RowFooter:   "\n"
  3. Now drag JSON Generator Transform from the SSIS Toolbox and open it.
  4. Right mouse click on item Mappings and select Add Element(s) (Below this node):
    Convert SQL table rows into JSON using JSON Generator Transform

    Convert SQL table rows into JSON using JSON Generator Transform

  5. Once a dialog appears, select ProductName as Source Column and name as Output Alias:

    Map table column to JSON property using JSON Generator Transform

    Map table column to JSON property using JSON Generator Transform

  6. You should get a similar view:
    JSON Generator Transform configuration to create JSON from a SQL table

    JSON Generator Transform configuration to create JSON from a SQL table

  7. Then go to Header / Footer tab and:
    • Select Direct String option.
    • Input <%RowHeader%> in Header String option.
    • Input <%RowFooter%> in Footer String option.
    JSON Generator Transform configuration to convert rows into JSON suitable for Elasticsearch Bulk operation

    JSON Generator Transform configuration to convert rows into JSON suitable for Elasticsearch Bulk operation

  8. Close the window.

JSON String to Update Index data for the Bulk API call

From the above section “JSON String to Create Index data for the Bulk API call” follows all the steps From Step-1 to Step-8.

Just for (2nd step) Step-2 use below expression for   RowHeader and  RowFooter Derived Column configuration to add prefix and header and footer to a JSON

Derived Column configuration to add prefix and header and footer to a JSONIn the Expression column give them values:

  • RowHeader:
  • RowFooter:   "}\n"
NOTE: To learn how to make more complicated transforms visit these links:

JSON String to Delete Index data for the Bulk API call

From the above section “JSON String to Create Index data for the Bulk API call” follows steps Step-1 and Step-2.

NOTE: No need to use JSON Generator Transform (From Step-3).

Just for (2nd step) Step-2 use below expression for   RowHeader Derived Column configuration to add prefix and header and footer to a JSON

Derived Column configuration to add prefix and header and footer to a JSON

In the Expression column give them values:

  • RowHeader:

Finally, Elasticsearch & SQL Server integration

  1. Now drag Web API Destination SSIS component into the Data Flow and open it for editing.
  2. In Select Connection section press <New ZS-HTTP Connection>.
  3. Once HTTP Connection Manager window opens configure connection to your Elasticsearch instance:
    • Set Web Url, which points to your Elasticsearch instance.
    • Set Credentials Type to Basic - UserID/Password (or other appropriate authentication method).
    • Finally, set User Name and Password:

      Configure SSIS HTTP Connection to connect to Elasticsearch

      Configure SSIS HTTP Connection to connect to Elasticsearch

  4. Close configuration window. Now it’s time to configure Web API Destination.
  5. All that is separating you from getting those rows into Elasticsearch:
    • Setting Input Column for Body to ZS_JSON_OUT for Create and Update Index Data and for Delete Index Data Input Column for Body to RowHeader [Derived Column].
    • Setting URL to http://localhost:9200/_bulk.
    • Setting HTTP Request Method to POST.
    Index SQL Server Database data in Elasticsearch using SSIS component "Web API Destination"

    Index SQL Server Database data in Elasticsearch using SSIS component Web API Destination

  6. Then open Batch Settings (For Body) tab:
    • Enable batch submission.
    • Set Body Batch Size, e.g. 1000Make sure this number is even, otherwise you may get into problems (Elasticsearch Bulk request has special JSON request body format, as you perhaps noticed).
    Web API Destination batch settings configuration

    Web API Destination batch settings configuration

  7. Close the window and run the package! You should see green lights, telling you everything is OK:
    Successful data load from SQL Server to Elasticsearch

    Successful data load from SQL Server to Elasticsearch

  8. We can also query Elasticsearch by using one of its Search APIs – URI Search to see if we successfully indexed data:
    SQL Server data index in Elasticsearch

    SQL Server data index in Elasticsearch

  9. Rows number match in SSIS and in search results, thus everything is good. You are done.

What if I want more? After upserting data use Web API Destination further

Let’s say you have a requirement to do something with rows that were freshly indexed in Elasticsearch. Then you will need somehow to distinguish between created and updated records in Elasticsearch. And that won’t be difficult because Web API Destination acts not only as Destination but as Transformation as well. So one thing you have to do is connect JSON Parser Transform downstream to Web API Destination. It will parse Elasticsearch HTTP JSON response – which is returned by Web API Destination – into columns, which you can later easily redirect using Conditional Split:

  1. Add JSON Parser Transform and connect it to Web API Destination:
    Add JSON Parser Transform to get Elasticsearch HTTP JSON response when integrating SQL Server & Elasticsearch

    Use JSON Parser Transform to get Elasticsearch HTTP JSON response when integrating SQL Server & Elasticsearch

  2. Make sure you have selected ResponseText as Select Input JSON Column.
  3. Then go to Bulk API and copy/paste “the result of this bulk operation” JSON response:
    Using JSON Parser Transform to parse JSON response from REST HTTP request

    Using JSON Parser Transform to parse JSON response from REST HTTP request

  4. Set $.items[*] as the filter.
  5. Optional step. You won’t find a response sample for every Elasticsearch API call. In that case, you may want to use Postman to make an HTTP request to Elasticsearch and get a sample response:
    Use Postman to make a request to Elasticsearch and get a sample response to be used in JSON Parser Transform

    Use Postman to make a request to Elasticsearch and get a sample response to be used in JSON Parser Transform

    NOTE: Don’t forget to set the username and password in Authorization section!
  6. Then use a Conditional Split and Trash Destination to redirect the rows:

    Input these clauses:

    • Inserted:
    • Updated:
  7. As a result, new index records will be redirected to one destination, while updated records – to the other:
    "Result
NOTE: Trash Destination is a handy component of ZappySys PowerPack which can be used as dummy destination when we don’t care about the destination (and we don’t care in this tutorial) 🙂

Overall you can use Web API Destination HTTP JSON response for other useful things as well, e.g. determine on how many replica shards record was indexed. JSON response will depend on which Elasticsearch API and which method you use.

Delete Index by making an API call.

If you want to delete the index by making Delete API call you can. Let’s make that call using the Rest API Task. Configure it like below screen and click on the Test Request button.

Rest API Delete Method

Rest API Delete Method

Things gone bad: Error handling & debugging

Incidentally, you may incorrectly construct JSON for Web API Destination bodyElasticsearch nodes may go offline or go out of memory. In any case you may want to know when that happens and take actions accordingly. For that purpose you have to redirect failed requests from Web API Destination to some other destination:

  1. Add a Derived Column above Web API Destination with expression (DT_WSTR,4000)ZS_JSON_OUT and name it JsonAsString. This will let you see what JSON you are actually passing.
  2. Then add database or file destination or use another Trash Destination for debugging purposes and redirect the bad rows (red arrow) from Web API Destination into it. Don’t forget to set Redirect row option for both, Error and Truncation columns:
    "Redirect
  3. Finally, add a Data Viewer for the red path, if you want to debug the flow. You will be able to see URL, JSON and the error message for each record. You may want to copy-paste ErrorMessage to Notepad if you want it to be more readable:
    Use Data Viewer to view HTTP requests that failed to be fulfilled in Elasticsearch

    Use Data Viewer to view HTTP requests that failed to be fulfilled in Elasticsearch

NOTE: You can read more about redirecting rows in SSIS Error Handling (Redirect bad rows) article.

Call ElasticSearch API hosted on AWS

If your ElasticSearch instance is hosted as AWS Managed ElasticSearch Instance then select   <New ZS-OAUTH Connection>. rather than ZS-HTTP (Explained later in this article). When OAuth UI launches select AWS v4 Provider. For more information on calling REST API on AWS check this article.

Configure SSIS OAuth Connection - Use Amazon AWS API Provider, Enter Access Key, Secret Key

Configure SSIS OAuth Connection – Use Amazon AWS API Provider, Enter Access Key, Secret Key

 

Call AWS Hosted ElasticSearch REST API in SSIS (V4 Request Signing)

Call AWS Hosted ElasticSearch REST API in SSIS (V4 Request Signing)

Common Errors

Truncation related error

The most common error you may face when you run an SSIS package is truncation error. During the design time only 300 rows are scanned from a source (a file or a REST API call response) to detect datatypes but at runtime, it is likely you will retrieve far more records. So it is possible that you will get longer strings than initially expected. For detailed instructions on how to fix common metadata related errors read an article "How to handle SSIS errors (truncation, metadata issues)".

Authentication related error

Another frequent error you may get is an authentication error, which happens when you deploy/copy a package to another machine and run it there. Check the paragraph below to see why it happens and how to solve this problem.

Deployment to Production

In SSIS package sensitive data such as tokens and passwords are by default encrypted by SSIS with your Windows account which you use to create a package. So SSIS will fail to decrypt tokens/passwords when you run it from another machine using another Windows account. To circumvent this when you are creating an SSIS package which uses authentication components (e.g. an OAuth Connection Manager or an HTTP Connection Manager with credentials, etc.), consider using parameters/variables to pass tokens/passwords. In this way, you won’t face authentication related errors when a package is deployed to a production server.

Check our article on how to configure packages with sensitive data on your production or development server.

Download a sample package

Be sure to download a sample SQL Server 2008 SSIS package, in case you want to try it right away (you can upgrade it to a higher version).

NOTE: Once you open the package, you won’t be able to run it immediately, but don’t panic. Just configure OLE DB Source to point to your Northwind database and set URLs to point to your Elasticsearch instance.

Conclusion. What’s next?

In this article we have learned how to bulk load data from SQL Server and upsert it in Elasticsearch index. We used JSON Generator Transform to help us convert database table rows into JSON documents. In addition, Web API Destination was very helpful in automatically creating and making HTTP requests to Elasticsearch instance and indexing our data as a result. This tutorial was quite simple and straightforward, yet it is a good starter to use other Elasticsearch APIs and their features. From now on Elasticsearch reference is your best friend in learning what APIs exist, what are their methods and how HTTP requests should look like.

Keywords

How to import data from SQL Server to Elasticsearch | How to load data into Elasticsearch | Using Elasticsearch to index SQL Server | Elasticsearch and SQL Server integration | How to use Elasticsearch together with SQL Server | Upsert SQL Server data into Elasticsearch

Posted in REST API Integration, SSIS JSON Parser Transform, SSIS JSON Source (File/REST), SSIS REST API Task and tagged , , , , , , .