Working with Web Services and Queues - TALEND


In this article, we will show the following two use cases of how to call a Web service using a tWebserviceInput component

  • Calling a public Web service
  • Calling a Web service that is exposed with a Talend Job


Talend Studio offers a variety of components for calling different types of Web services, such as tWebServiceInput, tWebservice (advanced component based on tWebServiceInput), tSOAP (for SOAP Web services) and tREST (for REST Web services).
This procedure uses the tWebserviceInput component to show how to call a Web service step by step.

Calling a public Web service

The Country Web service is a public Web service served by WebserviceX.NET, it is available for everyone as long as you are able to access internet. The following operations are supported in this Web service:

In this example, we will call the GetCurrencyByCountry operation in a Talend Job. Click GetCurrencyByCountry operation in the Country Web service page, you can read the SOAP definition of this operation, including request and response declaration. According to the SOAP request and response,  we can know that this Web service requires a CountryName as an input parameter and returns a string of GetCurrencyByCountryResult. Next, we will create a Talend Job that passes some country names to the Web service and get the currency data for each country name. 

Create a demo Job

Create a demo Job called CallPublicWebService. The Job design is as follows:

We use a tFixedFlowInput_1 to generate some country name that will be pass to tWebserviceInput and be set as the input parameters for calling the Web service, the country names can come from any a data source. In this example, two country names are generated, as seen below:

tFlowToIterate_1 is used to iterate each country name read from the data source. The Use the default (key, value) in global variable box is checked.

The configurations of tWebServiceInput_1 are as below:

  • Type in the WSDL url of the Country Web service in the WSDL field.
  • Type in GetCurrencyByCountry operation in the Method name field.
  • Pass the current country name to the Webservice as input parameter, type in (String)globalMap.get(“row1.countryName”) in the Parameter table.
  • Define one column named GetCurrencyByCountryResult with String data type in the schema.

Execute the Job

Execute the Job to see the following results on the console:

From the result, we can see that this Web service returns a string with XML format, however, in many real situations, we need to extract the value of specified elements such as , from the return string, use a  tExtractXMLField after tWebServiceInput to extract fields from the return string.

In order to print also the column name on the console, the Table model is checked on tLogRow_1.

Execute the job and the following result are printed on the console:

Starting job CallPublicWebService at 14:11 28/06/2014.
[statistics] connecting to socket on port 3691
[statistics] connected
|                  tLogRow_1                  |
|France     |fr         |Franc   |FRF         |
|France     |fr         |Franc   |FRF         |
|                    tLogRow_1                     |
|CountryName|CountryCode|Currency     |CurrencyCode|
|China      |cn         |Yuan Renminbi|CNY         |
|China      |cn         |Yuan Renminbi|CNY         |
[statistics] disconnected
Job CallPublicWebService ended at 14:11 28/06/2014. [exit code=0]

Calling a Web service that is exposed with a Talend Job

We will take another example to describe how to invoke a Web service that is exposed with a Talend Job. In this example, we will use a tWebserviceInput component to call the advanced Web service created in Deploying or exposing a Job as a Web service.

Creating a demo job

Create a demo Job called CallAdvancedWebservice. The Job design is as follows:

tWebserviceInput_1’s schema:

Execute the Job

Execute the Job to see the following results on the console.

Starting job CallWebserviceExample at 16:04 26/06/2014.
[statistics] connecting to socket on port 3811
[statistics] connected
|       tLogRow_1        |
|mydate             |name|
|26-06-2014 16:04:02|Ross|
|26-06-2014 16:04:02|Ross|
|26-06-2014 16:04:02|Ross|
[statistics] disconnected
Job CallWebserviceExample ended at 16:04 26/06/2014. [exit code=0]

Reading and writing to a queue

Talend ESB is supplied with the Apache ActiveMQ software for creating message queues and topics. This recipe shows how we can write to and read from an ActiveMQ queue.

Getting ready

First, we’ll need to start ActiveMQ.

1. Navigate to the folder Runtime_ESBSEactivemqbin and double-click on the file activemq.bat.
2. This will open a command window. Do not close this command window while you are doing this recipe.
3. You can access the ActiveMQ administration console by opening the URL localhost:8161/admin. This will allow you to view your queues and topics.
4. Open the job jo_cook_ch09_0080_readWriteQueue.

How to accomplish it…

The first thing to do is to write a message to a queue.

Writing to the queue

  • Drag a tMomOutput component to the canvas.
  • Create a flow between the tFileInputXML and the tMomOutput
  • Open the component and set the MQ Server to ActiveMQ, the To field to customerData, and the MessageType to Queue. Your tMomOuptut should look like the one in the next screenshot:

  • Run the job.
  • Open a browser and navigate to the URL http://localhost:8161/admin
  • Click Queues on the navigation bar. You will see the queue you just created with one message enqueued.
  • Double-click on the link customerData, and then click on the message.
  • You should now see the customer XML data in the message.

Reading the message from the queue

  • Now deactivate the subjob that writes to the queue.
  • Copy a tMomInput component to the canvas and change the settings to match those for the tMomOutput component, as shown below:

  • Add a tLogRow component and run the job.
  • The message will now be displayed in the log.
  • Return to the browser and click on the Queues option on the navigation bar
  • You will see that the number of pending messages is now 0.

How it works…

The first subjob wrote an XML message to the queue by the name customerData. We then viewed the customer data in the queue.
The second subjob then read the XML message from the queue, and we were able to see that the message had been removed (dequeued).

There’s more…

The tMomOutput and tMomInput components in this recipe were configured manually and will commit records as soon as they are read and/or written.
Ensuring lossless queues using sessions
In any production system, it is imperative that the data isn’t lost when being read/written to or from a data source/target. This recipe shows how this is achieved when reading and writing to queues using the tMom component.

Getting ready

Open the job jo_cook_ch09_0090_losslesQueues.

How to do it…

In a similar fashion to creating sessions with a database, we will first add ActiveMQ connection that will create the session.

  • Open the tMomConnection component to the canvas and tick the box Use Transacted.
  • Open the tMomOutput component and tick the Use existing connection
  • Set the To field to losslessQueue, and the Message Type to Queue.

Add the rollback and commit components

  • Drag a tMomCommit component to the canvas and link this to the tFixedFlowInput using an OnSubjobOk trigger.
  • Open the component and set the MQ Server to ActivMQ.

Successful run

  • Run the job and then check the queue in the web browser.
  • You will see that the queue losslessQueue has been created with 10 messages enqueued.
  • Delete the queue.

Failed run

  • Open the tMap component and change the Expression field in the table killjob to Numeric.sequence(“s2”,1,1) > 8.
  • Run the job and then check the queue in the web browser.
  • You will see that the queue losslessQueue has been created, however, there are 0 records enqueued.

How it works…

We first created a session that allows transaction style processing by defining a tMomConnection component as transacted.
We then ensured that our tMomOutput component is linked to the session by setting the Use existing connection option to true.
When we ran the first time, rows were sent down the flow killjob, so all 10 records are written to the queue when the commit is processed.
Then, the condition in the tMap component is set to mimic a job failure on the eighth record. When we ran the job after the condition was changed, the commit was never processed, so none of the records written to the queue were persisted.

There’s more…

Like the database sessions, the rollback on a queue is automatic if a job fails, so the job only needs to have a tMomCommit component.
The tMomInput component works in the same way as the tMomOutput when using a connection. In that, messages written to the queue will be removed if no commit takes place.


Get Updates on Tech posts, Interview & Certification questions and training schedules