Search our Blogs
Showing results for 
Search instead for 
Do you mean 
 

Chain Haven OnDemand API calls using Python

Processing structured and unstructured data to extract actionable insights for Business Intelligence reporting can be an extremely complex and time consuming task. In many cases there are several serial and parallel path sub-processes required to get the job done, where output from one task has to be transformed and/or combined before being used as the input to another task, with multiple steps until the data is fully processed. This is specialized work and is one of the reasons the job of the Data Analyst is in such high demand.


Fortunately for today’s businesses, APIs make the processing of structured and unstructured data a lot easier, and when chained together, can help to automate and accelerate data analytics processing. In this blog, I will show you how to analyze and extract data-driven insights from audio/video recordings using the Python programming language and the HPE Haven OnDemand library for Python.


If you are new to Haven OnDemand, please click here to create an account. Once your account is setup, click here to copy the API key so you can use it to call Haven OnDemand APIs.

 

Create the Python app

 

Let’s start by creating a Python app called chainapis.py and install the Haven OnDemand library for Python so we can easily call Haven OnDemand APIs. Click here to download the complete app project.

 

from havenondemand.hodclient import *
from havenondemand.hodresponseparser import *
import sys

hodClient = HODClient("YOUR-API-KEY")
parser = HODResponseParser()

# implement callback functions 

# 
params = {}
params["file"] = "testdata/Machine Learning 101.mp3"
params["language"] = "en-GB"

context = {}
context["hodapp"] = HODApps.RECOGNIZE_SPEECH

hodClient.post_request(params, HODApps.RECOGNIZE_SPEECH, async=True, callback=asyncRequestCompleted, **context)

 

Firstly, we import the Haven OnDemand client library and create an instance of the client library called hodClient. Then we create the params variable and specify the name of the media file as well as the language of the speech. For convenience, we also import the Haven OnDemand Response Parser library and create an instance called parser which helps us parse the response and detect if there is any error occurred.


Secondly, we define the keyword argument named context and assign the Haven OnDemand API name to be the value of the “hodapp” keyword. Later, we will use this context value in our callback function to identify the result of an API call.

 

We should also know that the Speech Recognition API call requires the asynchronous mode, so we set the async=True. In fact, any API call which tends to take a long time (more than 2 mins) to process, should be called with the asynchronous mode, otherwise we will end up in getting a timeout error message.


Finally, we specify the callback function named asyncRequestCompleted then call the post_request to submit our request to Haven OnDemand.


Implement the asyncRequestCompleted callback function


In response to an asynchronous request, the result will be a JSON string containing the ID of the task called jobID. And the jobID can be used for querying the actual result or the status of the API call.

 

def asyncRequestCompleted(response, **context):
    jobID = parser.parse_jobid(response)
    if jobID is None:
        errorObj = parser.get_last_error()
        for err in errorObj.errors:
            print ("Error code: %d \nReason: %s \nDetails: %s\n" % (err.error,err.reason, err.detail))
    else:
        hodClient.get_job_status(jobID, requestCompleted, **context)

 

Inside the callback function, we first use the Haven OnDemand Response Parser library to parse the JSON response for extracting the jobID. Then we need to check if the API call has been successfully submitted to Haven OnDemand server. That is why we should always check if the returned value from the parser “is None”. If there was an error, we will call the parser to get the error and print it out to the console.


We will call the get_job_status function to check the status of the task identified by the jobID value. We also specify the requestCompleted callback function and directly passing the **context to the calling function.


Implement the requestCompleted callback function

 

When the requestCompleted callback function is called, we will receive a response and the context.

 

def requestCompleted(response, **context):
    payloadObj = parser.parse_payload(response)

Again we use the Haven OnDemand Response Parser library to parse the JSON response for extracting the payload. All programs have to deal with errors, so does this app. That is why we check to see if the returned value from the parser “payloadObj is None”, meaning the response contains some error or a special status. Within the error handling loop, we will detect the error code from err.error and take appropriate actions. The code can be ErrorCode.QUEUED if the task is on the queue in Haven OnDemand server waiting for its turn to be processed. In this case, we cause a delay of 2 seconds then issue a get_job_status call again. The code can be ErrorCode.IN_PROGRESS if the task is being processed in Haven OnDemand server. In this case, we cause a delay of 10 seconds then poll with a get_job_status call again until the task is done or an error happens where we will print out the error reason and details.

 

if payload is None:
        errorObj = parser.get_last_error()
            resp = ""
		for err in errorObj.errors:
		    if err.error == ErrorCode.QUEUED:
		    # wait for some time then call GetJobStatus or GetJobResult again with the same jobID from err.jobID
		        print ("job is queued. Retry in 2 secs. jobID: " + err.jobID)
			  time.sleep(2)
			  hodClient.get_job_status(err.jobID, requestCompleted, **context)
		        return
		    elif err.error == ErrorCode.IN_PROGRESS:
		        # wait for some time then call GetJobStatus or GetJobResult again with the same jobID from err.jobID
			  print ("task is in progress. Retry in 10 secs. jobID: " + err.jobID)
			  time.sleep(10)
			  hodClient.get_job_status(err.jobID, requestCompleted, **context)
			  return
		    else:
			  resp += "Error code: %d \nReason: %s \nDetails: %s\njobID: %s\n" % (err.error,err.reason, err.detail, err.jobID)
            print (resp)

 

If there is no error, then we can walk the payloadObj JSON object to extract values. The requestCompleted function supposes to be the same callback function for different API calls and different APIs return different payload result. That is why we need to identify the API name so we can parse the response accordingly. Since we passed the API name to the **context when we call the get_job_status function, we will retrieve it through the context[“hodapp”].

 

Chain the APIs


If the response is a result from the Speech Recognition API, we will parse to get the recognized text and call the Sentiment Analysis API to analyze the text. This is how we chain the API calls to take the output from one API and provide it as the input for another API in our data analytics process.


If the response is a result from the Sentiment Analysis API, we will parse the result and print them out on the console.

 

else:
        hodapp = context["hodapp"]
	  if hodapp == HODApps.RECOGNIZE_SPEECH:
            documents = payloadObj["document"]
		for doc in documents:
		    resp += doc["content"] + "\n"
		    # We got the text recognized from the speech. Let’s analyze the text with sentiment analysis API
                paramArr = {}
		    paramArr["text"] = resp
		    context["hodapp"] = HODApps.ANALYZE_SENTIMENT
		    hodClient.post_request(paramArr, HODApps.ANALYZE_SENTIMENT, False, requestCompleted, **context)
		    return
		elif hodapp == HODApps.ANALYZE_SENTIMENT:
		    positives = payloadObj["positive"]
		    resp += "Positive:\n"
		    for pos in positives:
                    if pos.get('sentiment'):
			      resp += "Sentiment: " + pos["sentiment"] + "\n"
			  if pos.get('topic'):
				resp += "Topic: " + pos["topic"] + "\n"
			  resp += "Score: " + "%f " % (pos["score"]) + "\n"
		    negatives = payloadObj["negative"]
		    resp += "Negative:\n"
		    for neg in negatives:
		        if neg.get('sentiment'):
                        resp += "Sentiment: " + neg["sentiment"] + "\n"
			  if neg.get('topic'):
				resp += "Topic: " + neg["topic"] + "\n"
			  resp += "Score: " + "%f " % (neg["score"]) + "\n"
		    aggregate = payloadObj["aggregate"]
		    resp += "Aggregate:\n"
		    resp += "Sentiment: " + aggregate["sentiment"] + "\n"
		    resp += "Score: " + "%f " % (aggregate["score"])
		    print (resp)

 

With the chain API calls mechanism above, we can chain any API in a sequential call as needed and until the data is finally processed.

 

- Paco Vu

 

 

Comments
efroselli Level 4
| ‎09-12-2016 03:35

This is wonderful and useful, but now we can do it all with Combinations!

Please, can we have a tutorial on how to call Combinations from code?

Cheers.

Community Manager
| ‎09-12-2016 10:58

Thanks for great comments efroselli!

 

We are working on updating all our wrapper libraries to support calling combination APIs.

 

Stay tuned for the next blog discussing about this.

 

Kind regards,

Paco

Social Media
About the Author
Topics
† The opinions expressed above are the personal opinions of the authors, not of HPE. By using this site, you accept the Terms of Use and Rules of Participation