I've been trying to simplify the following Tweet_listener.py code to something a bit easier to understand because I need to teach it to beginners who will not have a strong Python background. My goal is to do the same thing without the class and functions (mostly). Tweet_Listener.py is what I have so far and it does stream tweets but I cannot get the data to appear in PySpark (the last code bit). When I try to display the data, nothing shows up where as the first Tweet_Listener.py code works. I cannot figure out what I need to change in the Tweet_Listener_simple.py code to make that happen. Any advice would be greatly appreciated!
Tweet_Listener.py
import tweepy
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key
class TweetsListener(StreamListener):
# initialized the constructor
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(data)
# the 'text' in the JSON file contains the actual tweet.
# We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
# We can come back and change this later on if we need to
print(msg['text'].encode('utf-8'))
# the actual tweet data is sent to the client socket
self.client_socket.send(msg['text'].encode('utf-8'))
return True
except BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
return True
# If there actually is an error, we will print the status
def on_error(self, status):
print(status)
return True
# Now let's set up our connection using the access tokens we got from twitter
def sendData(c_socket):
# authentication
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
# twitter_stream will get the actual live tweet data
# This is a stream object
twitter_stream = Stream(auth, TweetsListener(c_socket))
# filter the tweet feeds related to "corona"
twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
# twitter_stream.filter(track=['DataScience','python','Iot'])
if __name__ == '__main__':
# create a socket object
s = socket.socket()
# Get local machine name : host and port
host = "127.0.0.1"
# You'll want to make sure this port is being used elsewhere, otherwise you'll get an error
port = 3350
# Bind to the port
s.bind((host, port))
print("Listening on port: %s" % str(port))
# Wait and Establish the connection with client.
s.listen(5)
# This sends us back a tuple with the data and the addresss where it came from
c, addr = s.accept()
# Let's print it so we can confirm that when we are at the command line
print("Received request from: " + str(addr))
# Keep the stream data available
sendData(c)
Tweet_Listener_simple.py
##### Twitter libraries #######
import tweepy
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
###### socket library for testing ######
import socket
####### json library for handling the data ########
import json
#################################################################
########## Socket Basic set up (not specific to Twitter) ########
###################################################################
# create a socket object
s = socket.socket()#(socket.AF_INET, socket.SOCK_STREAM) #do we need this?
# Get local machine name : host and port
host = "127.0.0.1"
# You'll want to make sure this port is being used elsewhere, otherwise you'll get an error
port = 3351
# Bind to the port
s.bind((host, port))
print("Listening on port: %s" % str(port))
# Okay so now it will appear as if your code just hangs here when you run it
# Your code will not move to the next line until you establish a connection
# with the client in the next step, meaning your jupyter notebook needs to
# connect to the same address and post to kick off the listening.
# Wait and Establish the connection with client.
s.listen(5)
# This sends us back a tuple with the data and the address where it came from
client_socket, addr = s.accept()
# Let's print it so we can confirm that when we are at the command line
print("Received request from: " + str(addr)," connection created.")
############################################################
###### Now this next part is ALL Twitter Stuff ##########
############################################################
# This is logic from tweepy directly to check on the status of the stream
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
print(status.text)
# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key
# Create your listen object
myStreamListener = MyStreamListener()
# authentication
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
# Create your stream object
twitter_stream = tweepy.Stream(auth = auth, listener=myStreamListener)
# You'll want to filter your results down otherwise you'll get EVERYTHING from twitter
# twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
twitter_stream.filter(track=['DataScience','python','Iot'])
# Now we can try to accept data!
# This part will change a bit depending on your data source,
# But this shows you the essentials of what you'll need to do
while True:
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(twitter_stream)
# the 'text' in the JSON file contains the actual tweet.
# We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
# We can come back and change this later on if we need to
print(msg['text'].encode('utf-8'))
# the actual tweet data is sent to the client socket
client_socket.send(msg['text'].encode('utf-8'))
except BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
print(status.text)
client_socket.close()
break
PySpark code in a Jupyter notebook:
# Import your dependecies
import pyspark # run after findspark.init() if you need it
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
# Start up your pyspark session as always
spark = SparkSession.builder.appName("TwitterStream").getOrCreate()
spark
# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 3351) \
.load()
# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
# Then split words based on space, filter out only hashtag (#) values and group them up.
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
.groupBy('word') \
.count() \
.sort('count', ascending=False). \
filter(col('word').contains('#'))
writeTweet = tweets_tab \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("tweetquery") \
.start()
print("----- streaming is running -------")
# This is the part where nothing shows up
spark.sql("select * from tweetquery").show()
Output from select *:
+----+-----+
|word|count|
+----+-----+
+----+-----+