Source code for poultry.producers

import fileinput
import contextlib

from poultry import consumers
from poultry.stream import from_twitter_api
from poultry.tweet import Tweet, TweetValueError
from poultry.utils import get_file_names


[docs]def consume_stream(target, input_dir=None): """Read lines from the standard input or files in a directory. Behaves as a generator, should receive a .send() call to send a line to the target. """ file_names = get_file_names(input_dir) if input_dir else [] with contextlib.closing(fileinput.FileInput(file_names, openhook=fileinput.hook_compressed)) as lines: targets = [target] if target is not None else [] with consumers.closing(*targets): for line in lines: if not line.strip(): continue if isinstance(line, bytes): line = line.decode('utf-8') if target is not None: result = target.send(line) if result is not consumers.SendNext: yield result else: yield line
[docs]def readline_dir(input_dir, extract_retweets=False, mark_extracted=False): """Read a tweet collection directory. :param str input_dir: the patht to the directory :return: an iterable of Tweet objects """ for l in consume_stream(target=None, input_dir=input_dir): try: tweet = Tweet(l) except TweetValueError: continue else: # TODO: this duplicates consumers.extract_retweets. retweeted_status = tweet.parsed.get('retweeted_status', None) if extract_retweets and retweeted_status: yield Tweet(retweeted_status) yield tweet
[docs]def from_stream(target, source=None, config=None, extract_retweets=False): """Send lines from the standard input, the input directory or the Twitter Streaming API. :param target: a generator to which the read lines are sent. :param source: the path to a directory with tweet files. :param config: the config file :param extract_retweets: Extract retweets from the tweets. """ if extract_retweets: target = consumers.extract_retweets(target) if source in ('twitter://sample', 'twitter://filter'): consumer = from_twitter_api(target, source, config) else: consumer = consume_stream(target, source) try: while True: next(consumer) except StopIteration: pass