In [1]:
import twitter
import json
def oauth_login():
CONSUMER_KEY = 'Q1'
CONSUMER_SECRET = 'SQx'
OAUTH_TOKEN = '7Yd'
OAUTH_TOKEN_SECRET = 'Ah9'
auth = twitter.oauth.OAuth(OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
CONSUMER_KEY, CONSUMER_SECRET)
twitter_api = twitter.Twitter(auth=auth)
return twitter_api
# Sample usage
twitter_api = oauth_login()
# Nothing to see by displaying twitter_api except that it's now a
# defined variable
print twitter_api
###
In [3]:
#%% Example 4.
import twitter
import json
def twitter_search(twitter_api, q, max_results=200, **kw):
# See https://dev.twitter.com/docs/api/1.1/get/search/tweets and
# https://dev.twitter.com/docs/using-search for details on advanced
# search criteria that may be useful for keyword arguments
# See https://dev.twitter.com/docs/api/1.1/get/search/tweets
search_results = twitter_api.search.tweets(q=q, count=100)
statuses = search_results['statuses']
# search_results = twitter_api.search.tweets(q=q, count=100, **kw)
# statuses = search_results['statuses']
# Iterate through batches of results by following the cursor until we
# reach the desired number of results, keeping in mind that OAuth users
# can "only" make 180 search queries per 15-minute interval. See
# https://dev.twitter.com/docs/rate-limiting/1.1/limits
# for details. A reasonable number of results is ~1000, although
# that number of results may not exist for all queries.
# Enforce a reasonable limit
max_results = min(1000, max_results)
for _ in range(10): # 10*100 = 1000
try:
next_results = search_results['search_metadata']['next_results']
except KeyError, e: # No more results when next_results doesn't exist
break
# Create a dictionary from next_results, which has the following form:
# ?max_id=313519052523986943&q=NCAA&include_entities=1
kwargs = dict([ kv.split('=')
for kv in next_results[1:].split("&") ])
search_results = twitter_api.search.tweets(**kwargs)
statuses += search_results['statuses']
if len(statuses) > max_results:
break
return statuses
#
# Sample usage
twitter_api = oauth_login()
q = "#rstats"
results = twitter_search(twitter_api, q, max_results=10)
# Show one sample search result by slicing the list...
print json.dumps(results[0], indent=1)
print
print
print
# print json.dumps(status_texts[0], indent=1)
#
In [24]:
import twitter
import json
status_texts = [ status['text']
for status in statuses ]
#
screen_names = [ user_mention['screen_name']
for status in statuses
for user_mention in status['entities']['user_mentions'] ]
#
hashtags = [ hashtag['text']
for status in statuses
for hashtag in status['entities']['hashtags'] ]
#
#
# Compute a collection of all words from all tweets
words = [ w
for t in status_texts
for w in t.split() ]
In [18]:
status_texts = [ status['text']
for status in statuses ]
screen_names = [ user_mention['screen_name']
for status in statuses
for user_mention in status['entities']['user_mentions'] ]
hashtags = [ hashtag['text']
for status in statuses
for hashtag in status['entities']['hashtags'] ]
# Compute a collection of all words from all tweets
words = [ w
for t in status_texts
for w in t.split() ]
# Explore the first 5 items for each...
print json.dumps(status_texts[0:5], indent=1)
print json.dumps(screen_names[0:5], indent=1)
print json.dumps(hashtags[0:5], indent=1)
print json.dumps(words[0:5], indent=1)
In [12]:
# Explore the first 10 items for each...
import twitter
import json
print json.dumps(status_texts[0:10], indent=1)
print
print
In [10]:
status_texts = [ status['text']
for status in statuses ]
screen_names = [ user_mention['screen_name']
for status in statuses
for user_mention in status['entities']['user_mentions'] ]
hashtags = [ hashtag['text']
for status in statuses
for hashtag in status['entities']['hashtags'] ]
# Compute a collection of all words from all tweets
words = [ w
for t in status_texts
for w in t.split() ]
# Explore the first 5 items for each...
print json.dumps(status_texts[0:5], indent=1)
print json.dumps(screen_names[0:5], indent=1)
print json.dumps(hashtags[0:5], indent=1)
print json.dumps(words[0:5], indent=1)
In [13]:
# Finding topics of interest by using the filtering capablities it offers.
# Describe when to use search versus when to use streaming api - two different use cases.
import twitter
import sys
# Query terms
q = 'KATY PERRY' #comma separated list of terms
print >> sys.stderr, 'Filtering the public timeline for track="%s"' % (q,)
twitter_api = oauth_login() # Returns an instance of twitter.Twitter
twitter_stream = twitter.TwitterStream(auth=twitter_api.auth) # Reference the self.auth parameter
# See https://dev.twitter.com/docs/streaming-apis
stream = twitter_stream.statuses.filter(track=q)
# For illustrative purposes, when all else fails, search for Justin Bieber
# and something is sure to turn up (at least, on Twitter)
for tweet in stream:
print tweet['text']
# Save to a database in a particular collection
In [8]:
import twitter
import sys
import json
def extract_tweet_entities(statuses):
# See https://dev.twitter.com/docs/tweet-entities for more details on tweet entities
# and additional values that could be extracted in this function
# See also https://dev.twitter.com/blog/symbols-entities-tweets for details on a
# forthcoming tweet entity called "symbols" that is currently undocumented.
if len(statuses) == 0:
return [], [], [], []
screen_names = [ user_mention['screen_name']
for status in statuses
for user_mention in status['entities']['user_mentions'] ]
hashtags = [ hashtag['text']
for status in statuses
for hashtag in status['entities']['hashtags'] ]
urls = [ url['url']
for status in statuses
for url in status['entities']['urls'] ]
# In some circumstances (such as search results), the media entity
# may not appear
if status['entities'].has_key('media'):
media = [ media['url']
for status in statuses
for media in status['entities']['media'] ]
else:
media = []
return screen_names, hashtags, urls, media
# Sample usage
statuses = twitter_search(twitter_api, "CrossFit")
screen_names, hashtags, urls, media = extract_tweet_entities(statuses)
# Explore the first 5 items for each...
print json.dumps(screen_names[0:5], indent=1)
print json.dumps(hashtags[0:5], indent=1)
print json.dumps(urls[0:5], indent=1)
print json.dumps(media[0:5], indent=1)
In [14]:
import sys
import time
from urllib2 import URLError
import json
import twitter
def make_twitter_request(twitter_api_func, max_errors=3, *args, **kw):
# A nested helper function that handles common HTTPErrors. Return an updated value
# for wait_period if the problem is a 503 error. Block until the rate limit is reset if
# a rate limiting issue
def handle_twitter_http_error(e, wait_period=2, sleep_when_rate_limited=True):
if wait_period > 3600: # Seconds
print >> sys.stderr, 'Too many retries. Quitting.'
raise e
# See https://dev.twitter.com/docs/error-codes-responses for common codes
if e.e.code == 401:
print >> sys.stderr, 'Encountered 401 Error (Not Authorized)'
return None
elif e.e.code == 429:
print >> sys.stderr, 'Encountered 429 Error (Rate Limit Exceeded)'
if sleep_when_rate_limited:
print >> sys.stderr, "Sleeping for 15 minutes, and then I'll try again...ZzZ..."
sys.stderr.flush()
time.sleep(60*15 + 5)
print >> sys.stderr, '...ZzZ...Awake now and trying again.'
return 2
else:
raise e # Allow user to handle the rate limiting issue however they'd like
elif e.e.code in (502, 503):
print >> sys.stderr, 'Encountered %i Error. Will retry in %i seconds' % (e.e.code,
wait_period)
time.sleep(wait_period)
wait_period *= 1.5
return wait_period
else:
raise e
# End of nested helper function
wait_period = 2
error_count = 0
while True:
try:
return twitter_api_func(*args, **kw)
except twitter.api.TwitterHTTPError, e:
error_count = 0
wait_period = handle_twitter_http_error(e, wait_period)
if wait_period is None:
return
except URLError, e:
error_count += 1
print >> sys.stderr, "URLError encountered. Continuing."
if error_count > max_errors:
print >> sys.stderr, "Too many consecutive errors...bailing out."
raise
# Sample usage
twitter_api = oauth_login()
# See https://dev.twitter.com/docs/api/1.1/get/users/lookup for twitter_api.users.lookup
response = make_twitter_request(twitter_api.users.lookup, screen_name="katyperry")
print json.dumps(response, indent=1)
from functools import partial
from sys import maxint
def get_friends_followers_ids(twitter_api, screen_name=None, user_id=None,
friends_limit=maxint, followers_limit=maxint):
# Must have either screen_name or user_id (logical xor)
assert (screen_name != None) != (user_id != None), "Must have screen_name or user_id, but not both"
# See https://dev.twitter.com/docs/api/1.1/get/friends/ids and
# See https://dev.twitter.com/docs/api/1.1/get/followers/ids for details on API parameters
get_friends_ids = partial(make_twitter_request, twitter_api.friends.ids, count=5000)
get_followers_ids = partial(make_twitter_request, twitter_api.followers.ids, count=5000)
friends_ids, followers_ids = [], []
for twitter_api_func, limit, ids, label in [
[get_friends_ids, friends_limit, friends_ids, "friends"],
[get_followers_ids, followers_limit, followers_ids, "followers"]
]:
cursor = -1
while cursor != 0:
# Use make_twitter_request via the partially bound callable...
if screen_name:
response = twitter_api_func(screen_name=screen_name, cursor=cursor)
else: # user_id
response = twitter_api_func(user_id=user_id, cursor=cursor)
ids += response['ids']
cursor = response['next_cursor']
print >> sys.stderr, 'Fetched {0} total {1} ids for {2}'.format(len(ids), label, (user_id or screen_name))
# Consider storing the ids to disk during each iteration to provide an
# an additional layer of protection from exceptional circumstances
if len(ids) >= limit:
break
# Do something useful with the ids like store them to disk...
return friends_ids[:friends_limit], followers_ids[:followers_limit]
# Sample usage
twitter_api = oauth_login()
friends_ids, followers_ids = get_friends_followers_ids(twitter_api, screen_name="katyperry", friends_limit=10, followers_limit=10)
print friends_ids
print followers_ids
In [ ]:
No comments:
Post a Comment