#!/usr/bin/python # # Copyright 2007 Google Inc. All Rights Reserved. '''A library that provides a python interface to the Twitter API''' __author__ = 'dewitt@google.com' __version__ = '0.5' import base64 import md5 import os import simplejson import sys import tempfile import time import urllib import urllib2 import urlparse import twitter class TwitterError(Exception): '''Base class for Twitter errors''' class Status(object): '''A class representing the Status structure used by the twitter API. The Status structure exposes the following properties: status.created_at status.created_at_in_seconds # read only status.id status.text status.relative_created_at # read only status.user ''' def __init__(self, created_at=None, id=None, text=None, user=None, now=None): '''An object to hold a Twitter status message. This class is normally instantiated by the twitter.Api class and returned in a sequence. Note: Dates are posted in the form "Sat Jan 27 04:17:38 +0000 2007" Args: created_at: The time this status message was posted id: The unique id of this status message text: The text of this status message relative_created_at: A human readable string representing the posting time user: A twitter.User instance representing the person posting the message now: The current time, if the client choses to set it. Defaults to the wall clock time. ''' self.created_at = created_at self.id = id self.text = text self.user = user self.now = now def GetCreatedAt(self): '''Get the time this status message was posted. Returns: The time this status message was posted ''' return self._created_at def SetCreatedAt(self, created_at): '''Set the time this status message was posted. Args: created_at: The time this status message was created ''' self._created_at = created_at created_at = property(GetCreatedAt, SetCreatedAt, doc='The time this status message was posted.') def GetCreatedAtInSeconds(self): '''Get the time this status message was posted, in seconds since the epoch. Returns: The time this status message was posted, in seconds since the epoch. ''' return time.mktime(time.strptime(self.created_at, '%a %b %d %H:%M:%S +0000 %Y')) created_at_in_seconds = property(GetCreatedAtInSeconds, doc="The time this status message was " "posted, in seconds since the epoch") def GetId(self): '''Get the unique id of this status message. Returns: The unique id of this status message ''' return self._id def SetId(self, id): '''Set the unique id of this status message. Args: id: The unique id of this status message ''' self._id = id id = property(GetId, SetId, doc='The unique id of this status message.') def GetText(self): '''Get the text of this status message. Returns: The text of this status message. ''' return self._text def SetText(self, text): '''Set the text of this status message. Args: text: The text of this status message ''' self._text = text text = property(GetText, SetText, doc='The text of this status message') def GetRelativeCreatedAt(self): '''Get a human redable string representing the posting time Returns: A human readable string representing the posting time ''' fudge = 1.25 delta = int(self.now) - int(self.created_at_in_seconds) if delta < (1 * fudge): return 'about a second ago' elif delta < (60 * (1/fudge)): return 'about %d seconds ago' % (delta) elif delta < (60 * fudge): return 'about a minute ago' elif delta < (60 * 60 * (1/fudge)): return 'about %d minutes ago' % (delta / 60) elif delta < (60 * 60 * fudge): return 'about an hour ago' elif delta < (60 * 60 * 24 * (1/fudge)): return 'about %d hours ago' % (delta / (60 * 60)) elif delta < (60 * 60 * 24 * fudge): return 'about a day ago' else: return 'about %d days ago' % (delta / (60 * 60 * 24)) relative_created_at = property(GetRelativeCreatedAt, doc='Get a human readable string representing' 'the posting time') def GetUser(self): '''Get a twitter.User reprenting the entity posting this status message. Returns: A twitter.User reprenting the entity posting this status message ''' return self._user def SetUser(self, user): '''Set a twitter.User reprenting the entity posting this status message. Args: user: A twitter.User reprenting the entity posting this status message ''' self._user = user user = property(GetUser, SetUser, doc='A twitter.User reprenting the entity posting this ' 'status message') def GetNow(self): '''Get the wallclock time for this status message. Used to calculate relative_created_at. Defaults to the time the object was instantiated. Returns: Whatever the status instance believes the current time to be, in seconds since the epoch. ''' if self._now is None: self._now = time.mktime(time.gmtime()) return self._now def SetNow(self, now): '''Set the wallclock time for this status message. Used to calculate relative_created_at. Defaults to the time the object was instantiated. Args: now: The wallclock time for this instance. ''' self._now = now now = property(GetNow, SetNow, doc='The wallclock time for this status instance.') def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): try: return other and \ self.created_at == other.created_at and \ self.id == other.id and \ self.text == other.text and \ self.user == other.user except AttributeError: return False def __str__(self): '''A string representation of this twitter.Status instance. The return value is the same as the JSON string representation. Returns: A string representation of this twitter.Status instance. ''' return self.AsJsonString() def AsJsonString(self): '''A JSON string representation of this twitter.Status instance. Returns: A JSON string representation of this twitter.Status instance ''' return simplejson.dumps(self.AsDict(), sort_keys=True) def AsDict(self): '''A dict representation of this twitter.Status instance. The return value uses the same key names as the JSON representation. Return: A dict representing this twitter.Status instance ''' data = {} if self.created_at: data['created_at'] = self.created_at if self.id: data['id'] = self.id if self.text: data['text'] = self.text if self.user: data['user'] = self.user.AsDict() return data @staticmethod def NewFromJsonDict(data): '''Create a new instance based on a JSON dict. Args: data: A JSON dict, as converted from the JSON in the twitter API Returns: A twitter.Status instance ''' if 'user' in data: user = User.NewFromJsonDict(data['user']) else: user = None return Status(created_at=data.get('created_at', None), id=data.get('id', None), text=data.get('text', None), user=user) class User(object): '''A class representing the User structure used by the twitter API. The User structure exposes the following properties: user.id user.name user.screen_name user.location user.description user.profile_image_url user.url user.status ''' def __init__(self, id=None, name=None, screen_name=None, location=None, description=None, profile_image_url=None, url=None, status=None): self.id = id self.name = name self.screen_name = screen_name self.location = location self.description = description self.profile_image_url = profile_image_url self.url = url self.status = status def GetId(self): '''Get the unique id of this user. Returns: The unique id of this user ''' return self._id def SetId(self, id): '''Set the unique id of this user. Args: id: The unique id of this user. ''' self._id = id id = property(GetId, SetId, doc='The unique id of this user.') def GetName(self): '''Get the real name of this user. Returns: The real name of this user ''' return self._name def SetName(self, name): '''Set the real name of this user. Args: name: The real name of this user ''' self._name = name name = property(GetName, SetName, doc='The real name of this user.') def GetScreenName(self): '''Get the short username of this user. Returns: The short username of this user ''' return self._screen_name def SetScreenName(self, screen_name): '''Set the short username of this user. Args: screen_name: the short username of this user ''' self._screen_name = screen_name screen_name = property(GetScreenName, SetScreenName, doc='The short username of this user.') def GetLocation(self): '''Get the geographic location of this user. Returns: The geographic location of this user ''' return self._location def SetLocation(self, location): '''Set the geographic location of this user. Args: location: The geographic location of this user ''' self._location = location location = property(GetLocation, SetLocation, doc='The geographic location of this user.') def GetDescription(self): '''Get the short text description of this user. Returns: The short text description of this user ''' return self._description def SetDescription(self, description): '''Set the short text description of this user. Args: description: The short text description of this user ''' self._description = description description = property(GetDescription, SetDescription, doc='The short text description of this user.') def GetUrl(self): '''Get the homepage url of this user. Returns: The homepage url of this user ''' return self._url def SetUrl(self, url): '''Set the homepage url of this user. Args: url: The homepage url of this user ''' self._url = url url = property(GetUrl, SetUrl, doc='The homepage url of this user.') def GetProfileImageUrl(self): '''Get the url of the thumbnail of this user. Returns: The url of the thumbnail of this user ''' return self._profile_image_url def SetProfileImageUrl(self, profile_image_url): '''Set the url of the thumbnail of this user. Args: profile_image_url: The url of the thumbnail of this user ''' self._profile_image_url = profile_image_url profile_image_url= property(GetProfileImageUrl, SetProfileImageUrl, doc='The url of the thumbnail of this user.') def GetStatus(self): '''Get the latest twitter.Status of this user. Returns: The latest twitter.Status of this user ''' return self._status def SetStatus(self, status): '''Set the latest twitter.Status of this user. Args: status: The latest twitter.Status of this user ''' self._status = status status = property(GetStatus, SetStatus, doc='The latest twitter.Status of this user.') def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): try: return other and \ self.id == other.id and \ self.name == other.name and \ self.screen_name == other.screen_name and \ self.location == other.location and \ self.description == other.description and \ self.profile_image_url == other.profile_image_url and \ self.url == other.url and \ self.status == other.status except AttributeError: return False def __str__(self): '''A string representation of this twitter.User instance. The return value is the same as the JSON string representation. Returns: A string representation of this twitter.User instance. ''' return self.AsJsonString() def AsJsonString(self): '''A JSON string representation of this twitter.User instance. Returns: A JSON string representation of this twitter.User instance ''' return simplejson.dumps(self.AsDict(), sort_keys=True) def AsDict(self): '''A dict representation of this twitter.User instance. The return value uses the same key names as the JSON representation. Return: A dict representing this twitter.User instance ''' data = {} if self.id: data['id'] = self.id if self.name: data['name'] = self.name if self.screen_name: data['screen_name'] = self.screen_name if self.location: data['location'] = self.location if self.description: data['description'] = self.description if self.profile_image_url: data['profile_image_url'] = self.profile_image_url if self.url: data['url'] = self.url if self.status: data['status'] = self.status.AsDict() return data @staticmethod def NewFromJsonDict(data): '''Create a new instance based on a JSON dict. Args: data: A JSON dict, as converted from the JSON in the twitter API Returns: A twitter.User instance ''' if 'status' in data: status = Status.NewFromJsonDict(data['status']) else: status = None return User(id=data.get('id', None), name=data.get('name', None), screen_name=data.get('screen_name', None), location=data.get('location', None), description=data.get('description', None), profile_image_url=data.get('profile_image_url', None), url=data.get('url', None), status=status) class DirectMessage(object): '''A class representing the DirectMessage structure used by the twitter API. The DirectMessage structure exposes the following properties: direct_message.id direct_message.created_at direct_message.created_at_in_seconds # read only direct_message.sender_id direct_message.sender_screen_name direct_message.recipient_id direct_message.recipient_screen_name direct_message.text ''' def __init__(self, id=None, created_at=None, sender_id=None, sender_screen_name=None, recipient_id=None, recipient_screen_name=None, text=None): '''An object to hold a Twitter direct message. This class is normally instantiated by the twitter.Api class and returned in a sequence. Note: Dates are posted in the form "Sat Jan 27 04:17:38 +0000 2007" Args: id: The unique id of this direct message created_at: The time this direct message was posted sender_id: The id of the twitter user that sent this message sender_screen_name: The name of the twitter user that sent this message recipient_id: The id of the twitter that received this message recipient_screen_name: The name of the twitter that received this message text: The text of this direct message ''' self.id = id self.created_at = created_at self.sender_id = sender_id self.sender_screen_name = sender_screen_name self.recipient_id = recipient_id self.recipient_screen_name = recipient_screen_name self.text = text def GetId(self): '''Get the unique id of this direct message. Returns: The unique id of this direct message ''' return self._id def SetId(self, id): '''Set the unique id of this direct message. Args: id: The unique id of this direct message ''' self._id = id id = property(GetId, SetId, doc='The unique id of this direct message.') def GetCreatedAt(self): '''Get the time this direct message was posted. Returns: The time this direct message was posted ''' return self._created_at def SetCreatedAt(self, created_at): '''Set the time this direct message was posted. Args: created_at: The time this direct message was created ''' self._created_at = created_at created_at = property(GetCreatedAt, SetCreatedAt, doc='The time this direct message was posted.') def GetCreatedAtInSeconds(self): '''Get the time this direct message was posted, in seconds since the epoch. Returns: The time this direct message was posted, in seconds since the epoch. ''' return time.mktime(time.strptime(self.created_at, '%a %b %d %H:%M:%S +0000 %Y')) created_at_in_seconds = property(GetCreatedAtInSeconds, doc="The time this direct message was " "posted, in seconds since the epoch") def GetSenderId(self): '''Get the unique sender id of this direct message. Returns: The unique sender id of this direct message ''' return self._sender_id def SetSenderId(self, sender_id): '''Set the unique sender id of this direct message. Args: sender id: The unique sender id of this direct message ''' self._sender_id = sender_id sender_id = property(GetSenderId, SetSenderId, doc='The unique sender id of this direct message.') def GetSenderScreenName(self): '''Get the unique sender screen name of this direct message. Returns: The unique sender screen name of this direct message ''' return self._sender_screen_name def SetSenderScreenName(self, sender_screen_name): '''Set the unique sender screen name of this direct message. Args: sender_screen_name: The unique sender screen name of this direct message ''' self._sender_screen_name = sender_screen_name sender_screen_name = property(GetSenderScreenName, SetSenderScreenName, doc='The unique sender screen name of this direct message.') def GetRecipientId(self): '''Get the unique recipient id of this direct message. Returns: The unique recipient id of this direct message ''' return self._recipient_id def SetRecipientId(self, recipient_id): '''Set the unique recipient id of this direct message. Args: recipient id: The unique recipient id of this direct message ''' self._recipient_id = recipient_id recipient_id = property(GetRecipientId, SetRecipientId, doc='The unique recipient id of this direct message.') def GetRecipientScreenName(self): '''Get the unique recipient screen name of this direct message. Returns: The unique recipient screen name of this direct message ''' return self._recipient_screen_name def SetRecipientScreenName(self, recipient_screen_name): '''Set the unique recipient screen name of this direct message. Args: recipient_screen_name: The unique recipient screen name of this direct message ''' self._recipient_screen_name = recipient_screen_name recipient_screen_name = property(GetRecipientScreenName, SetRecipientScreenName, doc='The unique recipient screen name of this direct message.') def GetText(self): '''Get the text of this direct message. Returns: The text of this direct message. ''' return self._text def SetText(self, text): '''Set the text of this direct message. Args: text: The text of this direct message ''' self._text = text text = property(GetText, SetText, doc='The text of this direct message') def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): try: return other and \ self.id == other.id and \ self.created_at == other.created_at and \ self.sender_id == other.sender_id and \ self.sender_screen_name == other.sender_screen_name and \ self.recipient_id == other.recipient_id and \ self.recipient_screen_name == other.recipient_screen_name and \ self.text == other.text except AttributeError: return False def __str__(self): '''A string representation of this twitter.DirectMessage instance. The return value is the same as the JSON string representation. Returns: A string representation of this twitter.DirectMessage instance. ''' return self.AsJsonString() def AsJsonString(self): '''A JSON string representation of this twitter.DirectMessage instance. Returns: A JSON string representation of this twitter.DirectMessage instance ''' return simplejson.dumps(self.AsDict(), sort_keys=True) def AsDict(self): '''A dict representation of this twitter.DirectMessage instance. The return value uses the same key names as the JSON representation. Return: A dict representing this twitter.DirectMessage instance ''' data = {} if self.id: data['id'] = self.id if self.created_at: data['created_at'] = self.created_at if self.sender_id: data['sender_id'] = self.sender_id if self.sender_screen_name: data['sender_screen_name'] = self.sender_screen_name if self.recipient_id: data['recipient_id'] = self.recipient_id if self.recipient_screen_name: data['recipient_screen_name'] = self.recipient_screen_name if self.text: data['text'] = self.text return data @staticmethod def NewFromJsonDict(data): '''Create a new instance based on a JSON dict. Args: data: A JSON dict, as converted from the JSON in the twitter API Returns: A twitter.DirectMessage instance ''' return DirectMessage(created_at=data.get('created_at', None), recipient_id=data.get('recipient_id', None), sender_id=data.get('sender_id', None), text=data.get('text', None), sender_screen_name=data.get('sender_screen_name', None), id=data.get('id', None), recipient_screen_name=data.get('recipient_screen_name', None)) class Api(object): '''A python interface into the Twitter API By default, the Api caches results for 1 minute. Example usage: To create an instance of the twitter.Api class, with no authentication: >>> import twitter >>> api = twitter.Api() To fetch the most recently posted public twitter status messages: >>> statuses = api.GetPublicTimeline() >>> print [s.user.name for s in statuses] [u'DeWitt', u'Kesuke Miyagi', u'ev', u'Buzz Andersen', u'Biz Stone'] #... To fetch a single user's public status messages, where "user" is either a Twitter "short name" or their user id. >>> statuses = api.GetUserTimeline(user) >>> print [s.text for s in statuses] To use authentication, instantiate the twitter.Api class with a username and password: >>> api = twitter.Api(username='twitter user', password='twitter pass') To fetch your friends (after being authenticated): >>> users = api.GetFriends() >>> print [u.name for u in users] To post a twitter status message (after being authenticated): >>> status = api.PostUpdate('I love python-twitter!') >>> print status.text I love python-twitter! There are many other methods, including: >>> api.PostDirectMessage(user, text) >>> api.GetUser(user) >>> api.GetReplies() >>> api.GetUserTimeline(user) >>> api.GetStatus(id) >>> api.DestroyStatus(id) >>> api.GetFriendsTimeline(user) >>> api.GetFriends(user) >>> api.GetFollowers() >>> api.GetFeatured() >>> api.GetDirectMessages() >>> api.PostDirectMessage(user, text) >>> api.DestroyDirectMessage(id) >>> api.DestroyFriendship(user) >>> api.CreateFriendship(user) ''' DEFAULT_CACHE_TIMEOUT = 60 # cache for 1 minute _API_REALM = 'Twitter API' def __init__(self, username=None, password=None, input_encoding=None, request_headers=None, twitterserver='twitter.com'): '''Instantiate a new twitter.Api object. Args: username: The username of the twitter account. [optional] password: The password for the twitter account. [optional] input_encoding: The encoding used to encode input strings. [optional] request_header: A dictionary of additional HTTP request headers. [optional] twitterserver: you can use twitter.com or identi.ca [optional] ''' self._twitterserver = twitterserver self._cache = _FileCache() self._urllib = urllib2 self._cache_timeout = Api.DEFAULT_CACHE_TIMEOUT self._InitializeRequestHeaders(request_headers) self._InitializeUserAgent() self._input_encoding = input_encoding self.SetCredentials(username, password) def GetPublicTimeline(self, since_id=None): '''Fetch the sequnce of public twitter.Status message for all users. Args: since_id: Returns only public statuses with an ID greater than (that is, more recent than) the specified ID. [Optional] Returns: An sequence of twitter.Status instances, one for each message ''' parameters = {} if since_id: parameters['since_id'] = since_id url = 'http://%s/statuses/public_timeline.json' % self._twitterserver json = self._FetchUrl(url, parameters=parameters) data = simplejson.loads(json) return [Status.NewFromJsonDict(x) for x in data] def GetFriendsTimeline(self, user=None, since=None): '''Fetch the sequence of twitter.Status messages for a user's friends The twitter.Api instance must be authenticated if the user is private. Args: user: Specifies the ID or screen name of the user for whom to return the friends_timeline. If unspecified, the username and password must be set in the twitter.Api instance. [optional] since: Narrows the returned results to just those statuses created after the specified HTTP-formatted date. [optional] Returns: A sequence of twitter.Status instances, one for each message ''' if user: url = 'http://%s/statuses/friends_timeline/%s.json' % (self._twitterserver,user) elif not user and not self._username: raise TwitterError("User must be specified if API is not authenticated.") else: url = 'http://%s/statuses/friends_timeline.json' % self._twitterserver parameters = {} if since: parameters['since'] = since json = self._FetchUrl(url, parameters=parameters) data = simplejson.loads(json) return [Status.NewFromJsonDict(x) for x in data] def GetUserTimeline(self, user=None, count=None, since=None): '''Fetch the sequence of public twitter.Status messages for a single user. The twitter.Api instance must be authenticated if the user is private. Args: user: either the username (short_name) or id of the user to retrieve. If not specified, then the current authenticated user is used. [optional] count: the number of status messages to retrieve [optional] since: Narrows the returned results to just those statuses created after the specified HTTP-formatted date. [optional] Returns: A sequence of twitter.Status instances, one for each message up to count ''' try: if count: int(count) except: raise TwitterError("Count must be an integer") parameters = {} if count: parameters['count'] = count if since: parameters['since'] = since if user: url = 'http://%s/statuses/user_timeline/%s.json' % (self._twitterserver,user) elif not user and not self._username: raise TwitterError("User must be specified if API is not authenticated.") else: url = 'http://%s/statuses/user_timeline.json' % self._twitterserver json = self._FetchUrl(url, parameters=parameters) data = simplejson.loads(json) return [Status.NewFromJsonDict(x) for x in data] def GetStatus(self, id): '''Returns a single status message. The twitter.Api instance must be authenticated if the status message is private. Args: id: The numerical ID of the status you're trying to retrieve. Returns: A twitter.Status instance representing that status message ''' try: if id: int(id) except: raise TwitterError("id must be an integer") url = 'http://%s/statuses/show/%s.json' % (self._twitterserver,id) json = self._FetchUrl(url) data = simplejson.loads(json) return Status.NewFromJsonDict(data) def DestroyStatus(self, id): '''Destroys the status specified by the required ID parameter. The twitter.Api instance must be authenticated and thee authenticating user must be the author of the specified status. Args: id: The numerical ID of the status you're trying to destroy. Returns: A twitter.Status instance representing the destroyed status message ''' try: if id: int(id) except: raise TwitterError("id must be an integer") url = 'http://%s/statuses/destroy/%s.json' % (self._twitterserver,id) json = self._FetchUrl(url, post_data={}) data = simplejson.loads(json) return Status.NewFromJsonDict(data) def SetSource(self, source): self._source = source def PostUpdate(self, text): '''Post a twitter status message from the authenticated user. The twitter.Api instance must be authenticated. Args: text: The message text to be posted. Must be less than 140 characters. Returns: A twitter.Status instance representing the message posted ''' if not self._username: raise TwitterError("The twitter.Api instance must be authenticated.") if len(text) > 140: raise TwitterError("Text must be less than or equal to 140 characters.") url = 'http://%s/statuses/update.json' % self._twitterserver data = {'status': text} if (hasattr(self,'_source')): data['source'] = self._source json = self._FetchUrl(url, post_data=data) data = simplejson.loads(json) return Status.NewFromJsonDict(data) def GetReplies(self): '''Get a sequence of status messages representing the 20 most recent replies (status updates prefixed with @username) to the authenticating user. Returns: A sequence of twitter.Status instances, one for each reply to the user. ''' url = 'http://%s/statuses/replies.json' % self._twitterserver if not self._username: raise TwitterError("The twitter.Api instance must be authenticated.") json = self._FetchUrl(url) data = simplejson.loads(json) return [Status.NewFromJsonDict(x) for x in data] def GetFriends(self, user=None): '''Fetch the sequence of twitter.User instances, one for each friend. Args: user: the username or id of the user whose friends you are fetching. If not specified, defaults to the authenticated user. [optional] The twitter.Api instance must be authenticated. Returns: A sequence of twitter.User instances, one for each friend ''' if not self._username: raise TwitterError("twitter.Api instance must be authenticated") if user: url = 'http://%s/statuses/friends/%s.json' % (self._twitterserver,user) else: url = 'http://%s/statuses/friends.json' % self._twitterserver json = self._FetchUrl(url) data = simplejson.loads(json) return [User.NewFromJsonDict(x) for x in data] def GetFollowers(self): '''Fetch the sequence of twitter.User instances, one for each follower The twitter.Api instance must be authenticated. Returns: A sequence of twitter.User instances, one for each follower ''' if not self._username: raise TwitterError("twitter.Api instance must be authenticated") url = 'http://%s/statuses/followers.json' % self._twitterserver json = self._FetchUrl(url) data = simplejson.loads(json) return [User.NewFromJsonDict(x) for x in data] def GetFeatured(self): '''Fetch the sequence of twitter.User instances featured on twitter.com The twitter.Api instance must be authenticated. Returns: A sequence of twitter.User instances ''' url = 'http://%s/statuses/featured.json' % self._twitterserver json = self._FetchUrl(url) data = simplejson.loads(json) return [User.NewFromJsonDict(x) for x in data] def GetUser(self, user): '''Returns a single user. The twitter.Api instance must be authenticated. Args: user: The username or id of the user to retrieve. Returns: A twitter.User instance representing that user ''' url = 'http://%s/users/show/%s.json' % (self._twitterserver,user) json = self._FetchUrl(url) data = simplejson.loads(json) return User.NewFromJsonDict(data) def GetDirectMessages(self, since=None): '''Returns a list of the direct messages sent to the authenticating user. The twitter.Api instance must be authenticated. Args: since: Narrows the returned results to just those statuses created after the specified HTTP-formatted date. [optional] Returns: A sequence of twitter.DirectMessage instances ''' url = 'http://%s/direct_messages.json' % self._twitterserver if not self._username: raise TwitterError("The twitter.Api instance must be authenticated.") parameters = {} if since: parameters['since'] = since json = self._FetchUrl(url, parameters=parameters) data = simplejson.loads(json) return [DirectMessage.NewFromJsonDict(x) for x in data] def PostDirectMessage(self, user, text): '''Post a twitter direct message from the authenticated user The twitter.Api instance must be authenticated. Args: user: The ID or screen name of the recipient user. text: The message text to be posted. Must be less than 140 characters. Returns: A twitter.DirectMessage instance representing the message posted ''' if not self._username: raise TwitterError("The twitter.Api instance must be authenticated.") url = 'http://%s/direct_messages/new.json' % self._twitterserver data = {'text': text, 'user': user} json = self._FetchUrl(url, post_data=data) data = simplejson.loads(json) return DirectMessage.NewFromJsonDict(data) def DestroyDirectMessage(self, id): '''Destroys the direct message specified in the required ID parameter. The twitter.Api instance must be authenticated, and the authenticating user must be the recipient of the specified direct message. Args: id: The id of the direct message to be destroyed Returns: A twitter.DirectMessage instance representing the message destroyed ''' url = 'http://%s/direct_messages/destroy/%s.json' % (self._twitterserver,id) json = self._FetchUrl(url, post_data={}) data = simplejson.loads(json) return DirectMessage.NewFromJsonDict(data) def CreateFriendship(self, user): '''Befriends the user specified in the user parameter as the authenticating user. The twitter.Api instance must be authenticated. Args: The ID or screen name of the user to befriend. Returns: A twitter.User instance representing the befriended user. ''' url = 'http://%s/friendships/create/%s.json' % (self._twitterserver,user) json = self._FetchUrl(url, post_data={}) data = simplejson.loads(json) return User.NewFromJsonDict(data) def DestroyFriendship(self, user): '''Discontinues friendship with the user specified in the user parameter. The twitter.Api instance must be authenticated. Args: The ID or screen name of the user with whom to discontinue friendship. Returns: A twitter.User instance representing the discontinued friend. ''' url = 'http://%s/friendships/destroy/%s.json' % (self._twitterserver,user) json = self._FetchUrl(url, post_data={}) data = simplejson.loads(json) return User.NewFromJsonDict(data) def SetCredentials(self, username, password): '''Set the username and password for this instance Args: username: The twitter username. password: The twitter password. ''' self._username = username self._password = password def ClearCredentials(self): '''Clear the username and password for this instance ''' self._username = None self._password = None def SetCache(self, cache): '''Override the default cache. Set to None to prevent caching. Args: cache: an instance that supports the same API as the twitter._FileCache ''' self._cache = cache def SetUrllib(self, urllib): '''Override the default urllib implementation. Args: urllib: an instance that supports the same API as the urllib2 module ''' self._urllib = urllib def SetCacheTimeout(self, cache_timeout): '''Override the default cache timeout. Args: cache_timeout: time, in seconds, that responses should be reused. ''' self._cache_timeout = cache_timeout def SetUserAgent(self, user_agent): '''Override the default user agent Args: user_agent: a string that should be send to the server as the User-agent ''' self._request_headers['User-Agent'] = user_agent def SetXTwitterHeaders(self, client, url, version): '''Set the X-Twitter HTTP headers that will be sent to the server. Args: client: The client name as a string. Will be sent to the server as the 'X-Twitter-Client' header. url: The URL of the meta.xml as a string. Will be sent to the server as the 'X-Twitter-Client-URL' header. version: The client version as a string. Will be sent to the server as the 'X-Twitter-Client-Version' header. ''' self._request_headers['X-Twitter-Client'] = client self._request_headers['X-Twitter-Client-URL'] = url self._request_headers['X-Twitter-Client-Version'] = version def _BuildUrl(self, url, path_elements=None, extra_params=None): # Break url into consituent parts (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) # Add any additional path elements to the path if path_elements: # Filter out the path elements that have a value of None p = [i for i in path_elements if i] if not path.endswith('/'): path += '/' path += '/'.join(p) # Add any additional query parameters to the query string if extra_params and len(extra_params) > 0: extra_query = self._EncodeParameters(extra_params) # Add it to the existing query if query: query += '&' + extra_query else: query = extra_query # Return the rebuilt URL return urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) def _InitializeRequestHeaders(self, request_headers): if request_headers: self._request_headers = request_headers else: self._request_headers = {} def _InitializeUserAgent(self): user_agent = 'Python-urllib/%s (python-twitter/%s)' % \ (self._urllib.__version__, twitter.__version__) self.SetUserAgent(user_agent) def _AddAuthorizationHeader(self, username, password): if username and password: basic_auth = base64.encodestring('%s:%s' % (username, password))[:-1] self._request_headers['Authorization'] = 'Basic %s' % basic_auth def _RemoveAuthorizationHeader(self): if self._request_headers and 'Authorization' in self._request_headers: del self._request_headers['Authorization'] def _GetOpener(self, url, username=None, password=None): if username and password: self._AddAuthorizationHeader(username, password) handler = self._urllib.HTTPBasicAuthHandler() (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) handler.add_password(Api._API_REALM, netloc, username, password) opener = self._urllib.build_opener(handler) else: opener = self._urllib.build_opener() opener.addheaders = self._request_headers.items() return opener def _Encode(self, s): if self._input_encoding: return unicode(s, self._input_encoding).encode('utf-8') else: return unicode(s).encode('utf-8') def _EncodeParameters(self, parameters): '''Return a string in key=value&key=value form Values of None are not included in the output string. Args: parameters: A dict of (key, value) tuples, where value is encoded as specified by self._encoding Returns: A URL-encoded string in "key=value&key=value" form ''' if parameters is None: return None else: return urllib.urlencode(dict([(k, self._Encode(v)) for k, v in parameters.items() if v is not None])) def _EncodePostData(self, post_data): '''Return a string in key=value&key=value form Values are assumed to be encoded in the format specified by self._encoding, and are subsequently URL encoded. Args: post_data: A dict of (key, value) tuples, where value is encoded as specified by self._encoding Returns: A URL-encoded string in "key=value&key=value" form ''' if post_data is None: return None else: return urllib.urlencode(dict([(k, self._Encode(v)) for k, v in post_data.items()])) def _FetchUrl(self, url, post_data=None, parameters=None, no_cache=None): """Fetch a URL, optionally caching for a specified time. Args: url: The URL to retrieve data: A dict of (str, unicode) key value pairs. If set, POST will be used. parameters: A dict of key/value pairs that should added to the query string. [OPTIONAL] username: A HTTP Basic Auth username for this request username: A HTTP Basic Auth password for this request no_cache: If true, overrides the cache on the current request Returns: A string containing the body of the response. """ # Add key/value parameters to the query string of the url url = self._BuildUrl(url, extra_params=parameters) # Get a url opener that can handle basic auth opener = self._GetOpener(url, username=self._username, password=self._password) encoded_post_data = self._EncodePostData(post_data) # Open and return the URL immediately if we're not going to cache if encoded_post_data or no_cache or not self._cache or not self._cache_timeout: url_data = opener.open(url, encoded_post_data).read() else: # Unique keys are a combination of the url and the username if self._username: key = self._username + ':' + url else: key = url # See if it has been cached before last_cached = self._cache.GetCachedTime(key) # If the cached version is outdated then fetch another and store it if not last_cached or time.time() >= last_cached + self._cache_timeout: url_data = opener.open(url, encoded_post_data).read() self._cache.Set(key, url_data) else: url_data = self._cache.Get(key) # Always return the latest version return url_data class _FileCacheError(Exception): '''Base exception class for FileCache related errors''' class _FileCache(object): DEPTH = 3 def __init__(self,root_directory=None): self._InitializeRootDirectory(root_directory) def Get(self,key): path = self._GetPath(key) if os.path.exists(path): return open(path).read() else: return None def Set(self,key,data): path = self._GetPath(key) directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.isdir(directory): raise _FileCacheError('%s exists but is not a directory' % directory) temp_fd, temp_path = tempfile.mkstemp() temp_fp = os.fdopen(temp_fd, 'w') temp_fp.write(data) temp_fp.close() if not path.startswith(self._root_directory): raise _FileCacheError('%s does not appear to live under %s' % (path, self._root_directory)) if os.path.exists(path): os.remove(path) os.rename(temp_path, path) def Remove(self,key): path = self._GetPath(key) if not path.startswith(self._root_directory): raise _FileCacheError('%s does not appear to live under %s' % (path, self._root_directory )) if os.path.exists(path): os.remove(path) def GetCachedTime(self,key): path = self._GetPath(key) if os.path.exists(path): return os.path.getmtime(path) else: return None def _GetUsername(self): '''Attempt to find the username in a cross-platform fashion.''' return os.getenv('USER') or \ os.getenv('LOGNAME') or \ os.getenv('USERNAME') or \ os.getlogin() or \ 'nobody' def _GetTmpCachePath(self): username = self._GetUsername() cache_directory = 'python.cache_' + username return os.path.join(tempfile.gettempdir(), cache_directory) def _InitializeRootDirectory(self, root_directory): if not root_directory: root_directory = self._GetTmpCachePath() root_directory = os.path.abspath(root_directory) if not os.path.exists(root_directory): os.mkdir(root_directory) if not os.path.isdir(root_directory): raise _FileCacheError('%s exists but is not a directory' % root_directory) self._root_directory = root_directory def _GetPath(self,key): hashed_key = md5.new(key).hexdigest() return os.path.join(self._root_directory, self._GetPrefix(hashed_key), hashed_key) def _GetPrefix(self,hashed_key): return os.path.sep.join(hashed_key[0:_FileCache.DEPTH])