Azure Cognitive Services Sentiment Analysis v3.0 using Databricks PySpark

Azure cognitive services

Azure Cognitive Services Text Analytics is a great tool you can use to quickly evaluate a text data set for positive or negative sentiment. For example, a service provider can quickly and easily evaluate reviews as positive or negative and rank them based on the sentiment score detected.

As more and more businesses rely on electronic communications with their clients, understanding the overall sentiment attached to your product, service or image has never been more important. Sentiment analysis allow companies to automatically detect sentiment in any text (reviews, insurance claims, triaging etc) in a fast and highly scalable way.

My latest use was with a property management company with the aim of using the sentiment scores from client feedback on properties to identify and prioritise major issues, which enabled a quicker resolution to issues and improved customer service.

Today I’m going to go through how to use Azure Cognitive Services Text Analytics using Databricks PySpark Notebook to analyze the sentiment of COVID-19 Tweets and return sentiment scores and indicators as to whether it is a positive or negative tweet.

What is Azure Cognitive Services Text Analytics?

Cognitive Services are a set of machine learning algorithms that Microsoft has developed to solve problems in the field of Artificial Intelligence (AI). Developers can consume these algorithms through standard REST calls over the Internet to the Cognitive Services APIs in their Apps, Websites, or Workflows.

For this article, we will focus on the Text Analytics API Sentiment Analysis feature, which evaluates the text and returns sentiment scores and labels for each document and sentence. This is useful for detecting positive and negative sentiment for any language in social media, client reviews, discussion forums, and more.

Consuming the Sentiment Analysis API using PySpark.

To analyse text and return a sentiment analysis for our data we need the code to complete the following steps.

  1. Import a dataset with a text column.
  2. Set a parameter to identify the input dataset text column name making our code dynamic.
  3. Set Azure Cognitive Services API and Key.
  4. Create input Dataframe ready for the API post with an Id and Text column only.
  5. Convert Dataframe to JSON ready for the API Post.
  6. Post the JSON document to the Sentiment Analysis API.
  7. Flatten JSON API response into Dataframe with rows and columns.
  8. Join Dataframe with the original dataset to produce the final dataset and display for analysis.

Steps

  1. Add the following imports to your file PySpark Notebook and create input Dataframe by importing a COVID19 Tweet dataset.
from pyspark.sql.functions import *
import json, requests
# pprint is used to format the JSON response
from pprint import pprint
# File location and type
file_location = "/FileStore/tables/TwitterCovidTweets.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Results

COVID19 Tweet dataset Result

2. Create and set the name of the text column parameter, set this to the name of the column you want analyzed.

#Set Parameters
dbutils.widgets.removeAll()

ColumnName = dbutils.widgets.text("ColumnName","")

#Get Parameters
ColumnName = dbutils.widgets.get("ColumnName")

#Print Date Range
print("ColumnName = {}".format(ColumnName))

3. For the purpose of this demonstration, we will set the Sentiment Analysis API parameters manually. Please be aware a more secure method would be to use Azure Key Vault to provide a greater level of security.

endpoint = 'https://<Enter Name Here>.com/text/analytics/v3.0/Sentiment'
subscription_key = '<SEnter Subscription Key Here>'

4. The payload to the API consists of a list of JSON documents, which are tuples containing an id, language and a text attribute. The text attribute stores the text to be analyzed, the language is text language and the id can be any value. Therefore we need to add anid column and only select columns id,language and textcolumn for the API payload.

#Create Select List
selectCols = ["ID", "Language"]
selectCols.append(ColumnName)

#Add Unique ID
df = df.withColumn("ID", monotonically_increasing_id())
dfCog = df.filter(col(ColumnName).isNotNull())
dfCog = dfCog.selectExpr(selectCols)
display(dfCog)

5. Convert DataFrame dfCog into a DataFrame of JSON string in the correct format for the API.

dfCogJson = dfCog.toJSON()
dfCogJson = dfCogJson.collect()
resultlist_json = [json.loads(x) for x in dfCogJson] 
document = {"documents": resultlist_json}
pprint(document)

Output below.

Image 2 - Output

6. Post the JSON payload to the API passing in the subscription_key, endpoint and document.

headers = {"Ocp-Apim-Subscription-Key": subscription_key}
response = requests.post(endpoint, headers=headers, json=document)
response = response.json()
response

Successful response.

Image 3 - Successful Response

7. Now we have the response returned in JSON, we must flatten the document into rows and columns.

dfCog = spark.read.json(sc.parallelize(response["documents"]), prefersDecimal=True, dateFormat="yyyy-MM-dd",timestampFormat="yyyy-MM-dd HH:mm:ss",multiLine=False)

dfCog = dfCog.select(col("id").alias("ID"), 
                     col("sentiment").alias("Sentiment"),
                     explode(array("confidenceScores.positive")).alias("Positive"),
                    col("confidenceScores.neutral").alias("Neutral"),
        col("confidenceScores.negative").alias("Negative")
                 )

dfCog = dfCog.select("ID", "Sentiment", "Positive", "Neutral", "Negative")

display(dfCog)

8. Finally, we can join the analyzed dataset to the input dataset and drop the added ID column and display the final output.

dfFinal = df.join(dfCog, df.ID == dfCog.ID, how="left").drop('ID')
display(dfFinal)

The final result provides a sentiment score between 0.0 and 1.0 and an overall sentiment label, with a higher score indicating more positive sentiment.

Image 4 - Overall Sentiment Result

I have created this into a re-usable PySpark function. If you would like a copy please drop me a message and I can send you a link to my private GIT repo.

I hope this was helpful in saving you time understanding Azure Cognitive Sentiment Analysis and PySpark. Any thoughts, questions, corrections, and suggestions are very welcome 🙂

What do you think?

Scroll to Top

Subscribed! We'll let you know when we have new blogs and events...