ElasticSearch Connector for Talend Studio

In this article you will learn how to integrate ElasticSearch data in Talend Studio without coding in just a few clicks (live / bi-directional connection to ElasticSearch). Read / write ElasticSearch data inside your app; perform many ElasticSearch operations without coding, just use easy to use high performance API Connector for ElasticSearch.

Using ElasticSearch Connector you will be able to connect, read, and write data from within Talend Studio. Follow the steps below to see how we would accomplish that.

Download Documentation

Create Data Source in ZappySys Data Gateway based on API Driver

  1. Download and install ZappySys ODBC PowerPack.

  2. Search for gateway in start menu and Open ZappySys Data Gateway:
    Open ZappySys Data Gateway

  3. Go to Users Tab to add our first Gateway user. Click Add; we will give it a name tdsuser and enter password you like to give. Check Admin option and click OK to save. We will use these details later when we create linked server:
    ZappySys Data Gateway - Add User

  4. Now we are ready to add a data source. Click Add, give data source a name (Copy this name somewhere, we will need it later) and then select Native - ZappySys API Driver. Finally, click OK. And it will create the Data Set for it and open the ZS driver UI.

    ElasticSearchDSN

    ZappySys Data Gateway - Add Data Source

  5. When the Configuration window appears give your data source a name if you haven't done that already, then select "ElasticSearch" from the list of Popular Connectors. If "ElasticSearch" is not present in the list, then click "Search Online" and download it. Then set the path to the location where you downloaded it. Finally, click Continue >> to proceed with configuring the DSN:

    ElasticSearchDSN
    ElasticSearch
    ODBC DSN Template Selection

  6. Now it's time to configure the Connection Manager. Select Authentication Type, e.g. Token Authentication. Then select API Base URL (in most cases, the default one is the right one). More info is available in the Authentication section.

    Fill in all required parameters and set optional parameters if needed:

    ElasticSearchDSN
    ElasticSearch
    Basic Authentication (UserId/Password) [Http]
    http://localhost:9200
    Required Parameters
    Optional Parameters
    UserName Fill in the parameter...
    Password Fill in the parameter...
    IgnoreSSLCertificateErrors Fill in the parameter...
    ODBC DSN HTTP Connection Configuration

    Fill in all required parameters and set optional parameters if needed:

    ElasticSearchDSN
    ElasticSearch
    Windows Authentication (No Password) [Http]
    http://localhost:9200
    Required Parameters
    Optional Parameters
    IgnoreSSLCertificateErrors Fill in the parameter...
    ODBC DSN HTTP Connection Configuration

  7. Once the data source has been configured, you can preview data. Select the Preview tab and use settings similar to the following to preview data:
    ODBC ZappySys Data Source Preview

  8. Click OK to finish creating the data source.

Read ElasticSearch data in Talend Studio

To read ElasticSearch data in Talend Studio, we'll need to complete several steps. Let's get through them all right away!

This article is compatible with Talend Open Studio (a free version, currently retired by Qlik). If you don't have it, you can still purchase a shareware version of Talend Studio from Qlik.

Create connection for input

  1. First of all, open Talend Studio
  2. Create a new connection: Creating a new connection in Talend Studio
  3. Select Microsoft SQL Server connection: Creating SQL Server connection in Talend Studio
  4. Name your connection: Naming a connection in Talend Studio
  5. Fill-in connection parameters and then click Test connection:
    ElasticSearchDSN
    Configuring the ZappySys Data Gateway connection in Talend Studio
  6. If the List of modules not installed for this operation window shows up, then download and install all of them: Configure the connection
    Review and accept all additional module license agreements during the process
  7. Finally, you should see a successful connection test result at the end: Connection test successful

Add input

  1. Once we have a connection to ZappySys Data Gateway created, we can proceed by creating a job: Create a job in Talend Studio
  2. Simply drag and drop ZappySys Data Gateway connection onto the job: Creating an input based on ZappySys Data Gateway connection
  3. Then create an input based on ZappySys Data Gateway connection: Creating an input based on ZappySys Data Gateway connection
  4. Continue by configuring a SQL query and click Guess schema button: Configuring a SQL query in Talend Studio
  5. Finish by configuring the schema, for example: Configuring a schema in Talend Studio

Add output

We are ready to add an output. From Palette drag and drop a tFileOutputDelimited output and connect it to the input: Connecting tFileOutputDelimited output in Talend Studio

Run the job

Finally, run the job and integrate your ElasticSearch data: Integrating ElasticSearch data in Talend Studio

Advanced topics

Create Custom Stored Procedure in ZappySys Driver

You can create procedures to encapsulate custom logic and then only pass handful parameters rather than long SQL to execute your API call.

Steps to create Custom Stored Procedure in ZappySys Driver. You can insert Placeholders anywhere inside Procedure Body. Read more about placeholders here

  1. Go to Custom Objects Tab and Click on Add button and Select Add Procedure:
    ZappySys Driver - Add Stored Procedure

  2. Enter the desired Procedure name and click on OK:
    ZappySys Driver - Add Stored Procedure Name

  3. Select the created Stored Procedure and write the your desired stored procedure and Save it and it will create the custom stored procedure in the ZappySys Driver:
    Here is an example stored procedure for ZappySys Driver. You can insert Placeholders anywhere inside Procedure Body. Read more about placeholders here

    CREATE PROCEDURE [usp_get_orders]
        @fromdate = '<<yyyy-MM-dd,FUN_TODAY>>'
     AS
        SELECT * FROM Orders where OrderDate >= '<@fromdate>';
    

    ZappySys Driver - Create Custom Stored Procedure

  4. That's it now go to Preview Tab and Execute your Stored Procedure using Exec Command. In this example it will extract the orders from the date 1996-01-01:

    Exec usp_get_orders '1996-01-01';

    ZappySys Driver - Execute Custom Stored Procedure

  5. Let's generate the SQL Server Query Code to make the API call using stored procedure. Go to Code Generator Tab, select language as SQL Server and click on Generate button the generate the code.
    As we already created the linked server for this Data Source, in that you just need to copy the Select Query and need to use the linked server name which we have apply on the place of [MY_API_SERVICE] placeholder.

    SELECT * FROM OPENQUERY([MY_API_SERVICE], 'EXEC usp_get_orders @fromdate=''1996-07-30''')

    ZappySys Driver - Generate SQL Server Query

  6. Now go to SQL served and execute that query and it will make the API call using stored procedure and provide you the response.
    ZappySys Driver - Generate SQL Server Query

Create Custom Virtual Table in ZappySys Driver

ZappySys API Drivers support flexible Query language so you can override Default Properties you configured on Data Source such as URL, Body. This way you don't have to create multiple Data Sources if you like to read data from multiple EndPoints. However not every application support supplying custom SQL to driver so you can only select Table from list returned from driver.

If you're dealing with Microsoft Access and need to import data from an SQL query, it's important to note that Access doesn't allow direct import of SQL queries. Instead, you can create custom objects (Virtual Tables) to handle the import process.

Many applications like MS Access, Informatica Designer wont give you option to specify custom SQL when you import Objects. In such case Virtual Table is very useful. You can create many Virtual Tables on the same Data Source (e.g. If you have 50 URLs with slight variations you can create virtual tables with just URL as Parameter setting.

  1. Go to Custom Objects Tab and Click on Add button and Select Add Table:
    ZappySys Driver - Add Table

  2. Enter the desired Table name and click on OK:
    ZappySys Driver - Add Table Name

  3. And it will open the New Query Window Click on Cancel to close that window and go to Custom Objects Tab.

  4. Select the created table, Select Text Type AS SQL and write the your desired SQL Query and Save it and it will create the custom table in the ZappySys Driver:
    Here is an example SQL query for ZappySys Driver. You can insert Placeholders also. Read more about placeholders here

    SELECT
      "ShipCountry",
      "OrderID",
      "CustomerID",
      "EmployeeID",
      "OrderDate",
      "RequiredDate",
      "ShippedDate",
      "ShipVia",
      "Freight",
      "ShipName",
      "ShipAddress",
      "ShipCity",
      "ShipRegion",
      "ShipPostalCode"
    FROM "Orders"
    Where "ShipCountry"='USA'

    ZappySys Driver - Create Custom Table

  5. That's it now go to Preview Tab and Execute your custom virtual table query. In this example it will extract the orders for the USA Shipping Country only:

    SELECT * FROM "vt__usa_orders_only"

    ZappySys Driver - Execute Custom Virtual Table Query

  6. Let's generate the SQL Server Query Code to make the API call using stored procedure. Go to Code Generator Tab, select language as SQL Server and click on Generate button the generate the code.
    As we already created the linked server for this Data Source, in that you just need to copy the Select Query and need to use the linked server name which we have apply on the place of [MY_API_SERVICE] placeholder.

    SELECT * FROM OPENQUERY([MY_API_SERVICE], 'EXEC [usp_get_orders] ''1996-01-01''')

    ZappySys Driver - Generate SQL Server Query

  7. Now go to SQL served and execute that query and it will make the API call using stored procedure and provide you the response.
    ZappySys Driver - Generate SQL Server Query

Actions supported by ElasticSearch Connector

ElasticSearch Connector support following actions for REST API integration. If some actions are not listed below then you can easily edit Connector file and enhance out of the box functionality.
 Create Index
Create a new index    [Read more...]
Parameter Description
New Index Name
 Delete Index
Delete an exising index    [Read more...]
Parameter Description
Index to delete
 List indexes
Lists indexes    [Read more...]
 List aliases
Lists aliases    [Read more...]
Parameter Description
 Get index metadata
Gets index metadata    [Read more...]
Parameter Description
Index Enter index name. If you set alias then select underlying alias Index here.
Alias
 Get documents from Index or Alias
Gets documents from Index or Alias    [Read more...]
 Get document by ID from Index or Alias
   [Read more...]
Parameter Description
Index Enter index name. If you set alias then select underlying alias Index here.
Alias
Enter Document ID
 Search / Query documents
Gets documents (Using JSON Query Language)    [Read more...]
 Count documents
   [Read more...]
Parameter Description
Index or Alias Name (choose one --OR-- enter * --OR-- comma seperated names) You can enter index name(s) for which you like to perform document count. Enter * (asterisk) to perform search across all indices or comma seperate list (i.e. myidx1,myidx2) or select one from the populated list.
Enter Query (JSON Format)
Option Value
All Records {"match_all": { } }
Record where comment or name contains TV word {"query_string": {"query": "comment:TV OR name:TV"} }
Record with comment field (attribute exists) {"query_string": {"query": "_exists_:comment"} }
 Insert documents
Insert documents    [Read more...]
Parameter Description
Index
Alias
 Update documents
Update documents    [Read more...]
Parameter Description
Index
Alias
 Upsert documents
Upserts documents    [Read more...]
Parameter Description
Index
Alias
 Delete documents
Deletes documents    [Read more...]
Parameter Description
Index
 Generic Request
This is generic endpoint. Use this endpoint when some actions are not implemented by connector. Just enter partial URL (Required), Body, Method, Header etc. Most parameters are optional except URL.    [Read more...]
Parameter Description
Url API URL goes here. You can enter full URL or Partial URL relative to Base URL. If it is full URL then domain name must be part of ServiceURL or part of TrustedDomains
Body Request Body content goes here
IsMultiPart Set this option if you want to upload file(s) (i.e. POST RAW file data) or send data using Multi-Part encoding method (i.e. Content-Type: multipart/form-data). Multi-Part request allows you to mix key/value and upload files in same request. On the other hand raw upload allows only single file upload (without any key/value) ==== Raw Upload (Content-Type: application/octet-stream) ===== To upload single file in raw mode check this option and specify full file path starting with @ sign in the Body (e.g. @c:\data\myfile.zip ) ==== Form-Data / Multipart Upload (Content-Type: multipart/form-data) ===== To treat your Request data as multi part fields you must specify key/value pairs separated by new lines into RequestData field (i.e. Body). Each key value pair is entered on new-line and key/value are separated using equal sign (=). Preceding and trailing spaces are ignored also blank lines are ignored. If field value has some any special character(s) then use escape sequence (e.g. For NewLine: \r\n, For Tab: \t, For at (@): \@). When value of any field starts with at sign (@) its automatically treated as File you want to upload. By default file content type is determined based on extension however you can supply content type manually for any field using this way [ YourFileFieldName.Content-Type=some-content-type ]. By default File Upload Field always includes Content-Type in the request (non file fields do not have content-type by default unless you supply manually). For some reason if you dont want to use Content-Type header in your request then supply blank Content-Type to exclude this header altogather [e.g. SomeFieldName.Content-Type= ]. In below example we have supplied Content-Type for file2 and SomeField1, all other fields are using default content-type. See below Example of uploading multiple files along with additional fields. If some API requires you to pass Content-Type: multipart/form-data rather than multipart/form-data then manually set Request Header => Content-Type: multipart/mixed (it must starts with multipart/ else will be ignored). file1=@c:\data\Myfile1.txt file2=@c:\data\Myfile2.json file2.Content-Type=application/json SomeField1=aaaaaaa SomeField1.Content-Type=text/plain SomeField2=12345 SomeFieldWithNewLineAndTab=This is line1\r\nThis is line2\r\nThis is \ttab \ttab \ttab SomeFieldStartingWithAtSign=\@MyTwitterHandle
Filter Enter filter to extract array from response. Example: $.rows[*] --OR-- $.customers[*].orders[*]. Check your response document and find out hierarchy you like to extract
Headers Headers for Request. To enter multiple headers use double pipe or new line after each {header-name}:{value} pair

ElasticSearch Connector Examples for Talend Studio Connection

This page offers a collection of SQL examples designed for seamless integration with the ZappySys API ODBC Driver under ODBC Data Source (36/64) or ZappySys Data Gateway, enhancing your ability to connect and interact with Prebuilt Connectors effectively.

Create a new index (i.e. Table)    [Read more...]

Create a new index (i.e. Create a new table). To trow error if table exists you can set ContineOnErrorForStatusCode=0

SELECT * FROM create_index WITH(Name='my_new_index_name', ContineOnErrorForStatusCode=1)

Delete an exising index (i.e. Table)    [Read more...]

Delete an exising index. It it exists it will show status code 400

SELECT * FROM delete_index WITH(Name='my_index_name', ContineOn404Error=1 )

Generic API Call for ElasticSearch    [Read more...]

When EndPoint not defined and you like to call some API use this way. Below example shows how to call CREATE INDEX API generic way. See other generic API call examples.

SELECT * FROM generic_request 
WITH(Url='/my_index_name'
	, RequestMethod='PUT'
--	, Body='{}'
--	, Headers='X-Hdr1:aaa || x-HDR2: bbb'
	, Meta='acknowledged:bool'
	)

List indexes    [Read more...]

Lists indexes

SELECT * FROM Indexes

Get index metadata    [Read more...]

Gets index metadata

SELECT * FROM get_index_metadata WITH (Index='my_index_name')

Read ElasticSearch documents from Index (all or with filter)    [Read more...]

Gets documents by index name (i.e. Table name) or alias name (i.e. View name). Using WHERE clause invokes client side engine so try to avoid WHERE clause and use WITH clause QUERY attribute. Use search endpoint instead to invoke query.

SELECT * FROM MyIndexOrAliasName --WITH(Query='{"match": { "PartNumber" : "P50" } }')

Read ElasticSearch documents from Alias (all or with filter)    [Read more...]

Gets documents by index name (i.e. Table name) or alias name (i.e. View name). Using WHERE clause invokes client side engine so try to avoid WHERE clause and use WITH clause QUERY attribute. Use search endpoint instead to invoke query.

SELECT * FROM MyIndexOrAliasName --WITH(Query='{"match": { "PartNumber" : "P50" } }')

Search documents from Index using ElasticSearch Query language    [Read more...]

Below example shows how to search on a comment field for TV word anywhere in the text for Index named MyIndexOrAliasName (it can be index name or alias name). For more information on ElasticSearch Query expression check this link https://www.elastic.co/guide/en/elasticsearch/reference/6.8/query-dsl-match-query.html

SELECT * FROM MyIndexOrAliasName WITH(Query='{"match": { "comment" : "TV" } }')
		
		--or use below - slight faster (avoids table / alias list validation)
		--SELECT * FROM search WITH(Index='MyIndexName', Query='{"match": { "comment" : "TV" } }')
		--SELECT * FROM search WITH(Index='MyIndexName', Alias='MyAliasName', Query='{"match": { "comment" : "TV" } }')

Search documents from Alias using ElasticSearch Query language    [Read more...]

Below example shows how to search on Alias rather than Index name. Alias is build on index (consider like a view in RDBMS). This example filtes data from Alias with some condition in the Query Text. For more information on ElasticSearch Query expression check this link https://www.elastic.co/guide/en/elasticsearch/reference/6.8/query-dsl-match-query.html

SELECT * FROM MyAliasName WITH(Query='{"match": { "comment" : "TV" } }')
		
		--or use search endpoint then you must supply both Index name and Alias name 
		--calling /search endpoint in FROM clause is slight faster (avoids table / alias list validation)
		--SELECT * FROM search WITH(Index='MyIndexName',Index='MyAliasName', Query='{"match": { "comment" : "TV" } }')

Count ElasticSearch index documents using ElasticSearch Query language    [Read more...]

Below example shows how to get just count of documents from Index (single, multiple or all index). Optionally you can supply expression to filter. For more information on ElasticSearch Query expression check this link https://www.elastic.co/guide/en/elasticsearch/reference/6.8/query-dsl-match-query.html

SELECT * FROM count WITH(Index='MyIndexOrAliasName') --//get count of documents in index / alias named MyIndexOrAliasName
SELECT * FROM count WITH(Index='*') --//get count of documents in all indices (total distinct _id found across all indices + alias) 
SELECT * FROM count WITH(Index='MyIndex1,MyIndex2,MyAlias1,MyAlias2')--//get count of documents in indices named MyIndex1, MyIndex2 and Alias named MyAlias1,MyAlias2
SELECT * FROM count WITH(Index='MyIndexOrAliasName', Query='{"match": { "comment" : "TV" } }') --//get count of documents in MyIndex where comment field contains word "TV"

Count ElasticSearch alias documents using ElasticSearch Query language    [Read more...]

Below example shows how to get just count of documents from Alias (single, multiple or all alias). Optionally you can supply expression to filter. For more information on ElasticSearch Query expression check this link https://www.elastic.co/guide/en/elasticsearch/reference/6.8/query-dsl-match-query.html

SELECT * FROM count WITH(Index='MyIndexOrAliasName') --//get count of documents in index / alias named MyIndexOrAliasName
SELECT * FROM count WITH(Index='*') --//get count of documents in all indices (total distinct _id found across all indices + alias)
SELECT * FROM count WITH(Index='MyIndexOrAlias1,MyIndexOrAlias2') --//get count of documents in MyIndex1 and MyIndex2
SELECT * FROM count WITH(Index='MyIndex', Query='{"match": { "comment" : "TV" } }') --//get count of documents in Index named MyIndex where comment field contains word "TV"
SELECT * FROM count WITH(Index='MyAlias', Query='{"match": { "comment" : "TV" } }') --//get count of documents in Alias named MyAlias where comment field contains word "TV"

Using JSON Array / Value functions    [Read more...]

Below example shows how to select specific elements from value array or use JSON PATH expression to extract from document array

SELECT _id
        , JSON_ARRAY_FIRST(colors) as first_color
        , JSON_ARRAY_LAST(colors) as last_color
        , JSON_ARRAY_NTH(colors,3) as third_color
        , JSON_VALUE(locationList,'$.locationList[0].country') as first_preferred_country
        , JSON_VALUE(locationList,'$.locationList[?(@country=='India')].capital as capital_of_india
FROM shop WHERE _Id='1'

Insert documents into index with _id autogenerated    [Read more...]

When you dont supply _id column value, ElasticSearch will generate it automatically for you.

INSERT INTO MyIndex([MyCol1], [MyCol2] ) VALUES (100, 'A1')

Insert documents into index with your own _id    [Read more...]

Inserts documents into index with _id column. _id is string datatype so can be

INSERT INTO MyIndex(_id, [MyCol1], [MyCol2] ) VALUES ('A1234', 100, 'A1')

Insert documents using nested attribute and raw fragments (JSON sub-documents, arrays)    [Read more...]

This example produces JSON document like this {"_id": "some_auto_generated_id" , "Location": { "City" : "Atlanta" , "ZipCode" : "30060" },"ColorsArray ": ["Red", "Blue", "Green"],"SomeNestedDoc": { "Col1" : "aaa" , "Col2" : "bbb" , "Col2" : "ccc" }} . Notice that how Column name with Dot translated into nested Columns (i.e. City, ZipCode) and Prefix raw:: allowed to treat value as array or sub document.

INSERT INTO MyIndexName ([Location.City], [Location.ZipCode], [raw::ColorsArray], [raw::SomeNestedDoc] )
VALUES ('A1234', 'Atlanta', '30060', '["red","green","blue"]', '{"Col1":"aaa","Col2":"bbb","Col3":"ccc"}' )

Insert raw document (_rawdoc_ usage)    [Read more...]

This example shows how to insert document(s) in a raw format. When you use column name _rawdoc_ then its treated as RAW body. Notice that we use @ before string literal in value. This allow to use escape sequence (in this case \n for new line).

INSERT INTO shop(_RAWDOC_) 
VALUES(@'{"create":{"_index":"shop","_id":"1"}}\n{"name":"record-1","colors":["yellow","orange"]}\n{"create":{"_index":"shop","_id":"2"}}\n{"name":"record-2","colors":["red","blue"]}\n')

Update documents in index    [Read more...]

Updates documents in index

UPDATE MyIndex
  SET Col1 = 'NewValue-1', Col2 = 'NewValue-2'
  WHERE _Id = 'A1234'

Update raw document (_rawdoc_ usage)    [Read more...]

This example shows how to update document(s) in a raw format. When you use column name _rawdoc_ then its treated as RAW body. Notice that we use @ before string literal in value. This allow to use escape sequence (in this case \n for new line).

UPDATE shop SET _rawdoc_ = @'{"update": {"_index": "shop", "_id": "1"}}\n{ "doc": {"colors":["yellow","orange"] } }\n{"update": {"_index": "shop", "_id": "2"}}\n{ "doc": {"colors":["yellow","blue"] } }\n'

Update array or sub document    [Read more...]

This example shows how to update Array / nested Sub-document by adding raw:: prefix infront of column name to treat column as json fragment

UPDATE MyIndex
  SET name = 'abcd', [raw::colors]='["yellow","red"]', [raw::location]='{x:10, y:20}' 
  WHERE _id='1'

Delete documents from index    [Read more...]

Deletes documents from index

DELETE MyIndex WHERE _id = 'A1234'

Conclusion

In this article we discussed how to connect to ElasticSearch in Talend Studio and integrate data without any coding. Click here to Download ElasticSearch Connector for Talend Studio and try yourself see how easy it is. If you still have any question(s) then ask here or simply click on live chat icon below and ask our expert (see bottom-right corner of this page).

Download ElasticSearch Connector for Talend Studio Documentation 

More integrations

Other application integration scenarios for ElasticSearch

Other connectors for Talend Studio


Download ElasticSearch Connector for Talend Studio Documentation

  • How to connect ElasticSearch in Talend Studio?

  • How to get ElasticSearch data in Talend Studio?

  • How to read ElasticSearch data in Talend Studio?

  • How to load ElasticSearch data in Talend Studio?

  • How to import ElasticSearch data in Talend Studio?

  • How to pull ElasticSearch data in Talend Studio?

  • How to push data to ElasticSearch in Talend Studio?

  • How to write data to ElasticSearch in Talend Studio?

  • How to POST data to ElasticSearch in Talend Studio?

  • Call ElasticSearch API in Talend Studio

  • Consume ElasticSearch API in Talend Studio

  • ElasticSearch Talend Studio Automate

  • ElasticSearch Talend Studio Integration

  • Integration ElasticSearch in Talend Studio

  • Consume real-time ElasticSearch data in Talend Studio

  • Consume real-time ElasticSearch API data in Talend Studio

  • ElasticSearch ODBC Driver | ODBC Driver for ElasticSearch | ODBC ElasticSearch Driver | SSIS ElasticSearch Source | SSIS ElasticSearch Destination

  • Connect ElasticSearch in Talend Studio

  • Load ElasticSearch in Talend Studio

  • Load ElasticSearch data in Talend Studio

  • Read ElasticSearch data in Talend Studio

  • ElasticSearch API Call in Talend Studio