from itertools import islice, chain
import csv
from copy import copy
import random
import os
try:
from collections import defaultdict, deque, Counter, namedtuple
except ImportError:
from backport_collections import defaultdict, deque, Counter, namedtuple
import sys
try:
reduce
except NameError:
from functools import reduce
class Nothing(object):
pass
class Datum(object):
def __init__(self, attributes):
if isinstance(attributes, dict):
for name, value in attributes.items():
setattr(self, name, value)
else:
for name, value in attributes:
setattr(self, name, value)
def __repr__(self):
return "Datum({})".format(self.__dict__)
[docs]class DataStream(object):
""" Foundation for the package - :py:class:`DataStream` allows you to chain
map/filter/reduce/etc style operations together:
>>> stream = DataStream(range(10))
>>> stream.filter(lambda n: n % 2 == 0).map(lambda n: n*5).to_list()
... [0, 10, 20, 30, 40]
DataStreams are evaluated lazily (using generators), providing memory efficiency and speed. Using :py:func:`collect` produces a :py:class:`DataSet`, which evalutes the whole stream and caches the result.
"""
@staticmethod
def Stream(iterable,
transform=lambda row: row,
predicate=lambda row: True):
# TODO document why for this!
return DataStream(iterable, transform=transform, predicate=predicate)
@staticmethod
def Set(iterable):
# TODO document why for this!
return DataSet(iterable)
def __init__(self, source,
transform=lambda row: row,
predicate=lambda row: True):
self._source = iter(source)
self._transform = transform
self._predicate = predicate
def __iter__(self):
return (self._transform(row)
for row in self._source
if self._predicate(row))
def __repr__(self):
return "{}({})".format(self.__class__.__name__, str(self._source))
def __str__(self):
return self.__repr__()
def __next__(self):
while True:
src_next = next(self._source)
if self._predicate(src_next):
return self._transform(src_next)
def next(self):
return self.__next__()
[docs] def reduce(self, function, initial=None):
""" Applying a reducing function to rows in a stream
:param function function: reducing function, with parameters ``last_iteration``, ``next_value``
:param initial: initial value for reduce, if None, takes the first element of this stream as initial
"""
if initial is None:
initial = next(self)
return reduce(function, self, initial)
[docs] def reduce_to_dataset(self, function, initial=None):
""" Applies a reducer over this stream, returning a `DataSet` of the results
:param function: reducing function, with parameters ``last_iteration``, ``next_value``
:param initial: initial value for reduce, if None, takes the first element of this stream as initial
:rtype: DataSet
"""
return DataSet(self.reduce(function, initial))
[docs] def map(self, function):
""" Apply a function to each row in this stream
>>> DataStream(range(5)).map(lambda n: n * 5).to_list()
... [0, 5, 10, 15, 20]
:param function function: function to apply
:rtype: DataStream
"""
return self.Stream(self, transform=function)
[docs] def map_method(self, method, *args, **kwargs):
""" Call named method of each row using supplied args/kwargs
>>> DataStream(['hi', 'hey', 'yo']).map_method('upper').to_list()
... ['HI', 'HEY', 'YO']
:param str method: name of method to be called
:rtype: DataStream
"""
return self.map(lambda row: self.getattr(row, method)(*args, **kwargs))
[docs] def concat(self):
""" Alias for :py:func:`chain`
>>> DataStream(['this', 2, None]).map(dir).concat().to_list()
... ['__add__', '__class__', '__contains__', '__delattr__', ...]
:rtype: DataStream
"""
return self.chain()
[docs] def concat_map(self, function):
""" :py:func:`map` a function over the stream, then concat it
>>> DataStream(['this', 2, None]).concat_map(dir).to_list()
... ['__add__', '__class__', '__contains__', '__delattr__', ...]
:param function function: function to apply
:rtype: DataStream
"""
return self.map(function).concat()
[docs] def chain(self):
""" Chains together iterables, flattening them
>>> DataStream(['this', 2, None]).map(dir).chain().to_list()
... ['__add__', '__class__', '__contains__', '__delattr__', ...]
:rtype: DataStream
"""
return self.Stream(chain.from_iterable(self))
[docs] def filter(self, filter_fn):
""" Filters a stream using the passed in predicate function.
>>> DataStream(range(10)).filter(lambda n: n % 2 == 0).to_list()
... [0, 2, 4, 6, 8]
:param function filter_fn: only passes values for which filter_fn returns ``True``
:rtype: DataStream
"""
return self.Stream(self, predicate=filter_fn)
[docs] def filters(self, filter_fns):
""" Apply a list of filter functions
>>> evens_less_than_six = [lambda n: n < 6, lambda n: n % 2 == 0]
>>> DataStream(range(10)).filters(evens_less_than_six).to_list()
... [0, 2, 4]
:param list[function] filter_fns: list of filter functions
:rtype: DataStream
"""
predicate = lambda row: all([pred(row) for pred in filter_fns])
return self.Stream(self, predicate=predicate)
[docs] def filter_method(self, method, *args, **kwargs):
""" Filters using a method of the stream row using passed in args/kwargs
>>> DataStream(['hi', 'h1', 'ho']).filter_method('isalpha').to_list()
... ['hi', 'ho']
:param str method: name of method to be called
:rtype: DataStream
"""
return self.filter(lambda row: self.getattr(row, method)(*args, **kwargs))
[docs] def set(self, name, transfer_func=None, value=None):
""" Sets the named attribute of each row in the stream using the supplied function
:param name: attribute name
:param transfer_func: function that takes the row and returns the value to be stored at the named attribute
:rtype: DataStream
"""
if transfer_func is not None:
def row_setattr(row):
new_row = copy(row)
self.setattr(new_row, name, transfer_func(row))
return new_row
else:
def row_setattr(row):
new_row = copy(row)
self.setattr(new_row, name, value)
return new_row
return self.map(row_setattr)
[docs] def get(self, name, default=None):
""" Gets the named attribute of each row in the stream
>>> Person = namedtuple('Person', ['name', 'year_born'])
>>> DataStream([Person('amy', 1987), Person('brad', 1980)]).get('year_born').to_list()
... [1987, 1980]
:param str attr: attribute name
:param default: default value to use if attr name not found in row
:rtype: DataStream
"""
def row_getattr(row):
return self.getattr(row, name) if self.hasattr(row, name) else default
return self.map(row_getattr)
[docs] def delete(self, attr):
""" Deletes the named attribute for each row in the stream """
def obj_del(row):
new_row = copy(row)
delattr(new_row, attr)
return new_row
return self.map(obj_del)
[docs] def for_each(self, function):
""" Calls a function for each row in the stream, but passes the row value through
>>> from pprint import pprint
>>> DataStream(range(3)).for_each(pprint).execute()
... 0
... 1
... 2
... <datastreams.DataStream at 0x7f6995ea4790>
:param function function: function to call on each row
:rtype: DataStream
"""
def apply_fn(row):
function(row)
return row
return self.map(apply_fn)
def print_each(self):
def printer(row):
print(row)
return self.for_each(printer)
[docs] def take(self, n):
""" Takes n rows from the stream
>>> DataStream(range(100000)).take(3).to_list()
... [0, 1, 2]
:param int n: number of rows to be taken
:rtype: DataStream
"""
return self.Stream(islice(self, 0, n))
[docs] def take_now(self, n):
""" Like take, but evaluates immediately and returns a :py:class:`DataSet`
>>> DataStream(range(100000)).take_now(3)
... DataSet([0, 1, 2])
:param int n: number of rows to be taken
:rtype: DataSet
"""
return self.Set([next(self) for _ in range(n)])
[docs] def drop(self, n):
""" Drops n rows from the stream
>>> DataStream(range(10)).drop(5).to_list()
... [5, 6, 7, 8, 9]
:param int n: number of rows to be dropped
:rtype: DataStream
"""
return self.Stream(islice(self, n, None))
[docs] def collect(self):
""" Collects the stream into a :py:class:`DataSet`
>>> DataStream(range(5)).map(lambda n: n * 5).collect()
... DataSet([0, 5, 10, 15, 20])
:rtype: DataSet
"""
return self.Set(self)
[docs] def collect_as(self, constructor):
""" Collects using a constructor
>>> DataStream(range(5)).collect_as(str)
... DataSet(['0', '1', '2', '3', '4'])
:param constructor: class or constructor function
:rtype: DataSet
"""
return self.map(constructor).collect()
[docs] def execute(self):
""" Evaluates the stream (nothing happens until a stream is evaluted)
>>> from pprint import pprint
>>> DataStream(range(3)).for_each(pprint).execute()
... 0
... 1
... 2
... <datastreams.DataStream at 0x7f6995ea4790>
"""
list(self)
[docs] def count(self):
""" Counts the number of rows in this stream. This will exhaust a stream!
>>> DataStream(range(5)).count()
... 5
:rtype: int
"""
count = 0
for _ in self:
count += 1
return count
[docs] def batch(self, batch_size):
""" Batches rows of a stream in a given chunk size
>>> DataStream(range(10)).batch(2).to_list()
... [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
:param int batch_size: size of each batch
:rtype: DataStream
"""
return self.window(batch_size, batch_size)
[docs] def window(self, length, interval):
""" Windows the rows of a stream in a given length and interval
>>> DataStream(range(5)).window(3, 2).to_list()
... [DataSet([0, 1, 2]), DataSet([2, 3, 4])]
:param int length: length of window
:param int interval: distance between windows
:rtype: DataStream
"""
queue = deque(maxlen=length)
def window_iter():
queue.extend(self.take_now(length))
yield self.Set(queue)
while True:
for _ in range(interval):
queue.popleft()
try:
for _ in range(interval):
queue.append(next(self))
yield self.Set(queue)
except StopIteration:
if len(queue) != 0:
yield self.Set(queue)
break
return self.Stream(window_iter())
[docs] def dedupe(self, key_fn=lambda a: a):
""" Removes duplicates from a stream, returning only unique values.
>>> DataStream('aaaabcccddd').dedupe().to_list()
... ['a', 'b', 'c', 'd']
:param function key_fn: function returning a hashable value used to determine uniqueness
:return: DataStream
"""
seen = set()
def unique():
for row in self:
if key_fn(row) not in seen:
seen.add(key_fn(row))
yield row
return self.Stream(unique())
[docs] def sample(self, probability, n):
""" Sample N rows with a given probability of choosing a given row
>>> DataStream(range(100)).sample(0.1, 5)
...
:param float probability: probability that a sample is chosen
:param int n: population size to sample
:rtype: DataStream
"""
return self.filter(lambda row: random.random() > probability).take(n)
[docs] def group_by(self, key):
""" Groups a stream by key, returning a :py:class:`DataSet` of ``(K, tuple(V))``
>>> stream = DataStream(range(3) * 3)
>>> stream.group_by('real').to_dict()
... {0: (0, 0, 0), 1: (1, 1, 1), 2: (2, 2, 2)}
:param str key: attribute name to group by
:rtype: DataSet
"""
return self.group_by_fn(lambda row: self.getattr(row, key))
[docs] def group_by_fn(self, key_fn):
""" Groups a stream by function, returning a :py:class:`DataSet` of ``(K, tuple(V))``
>>> stream = DataStream(['hi', 'hey', 'yo', 'sup'])
>>> stream.group_by_fn(lambda w: len(w)).to_dict()
... {2: ('hi', 'yo'), 3: ('hey', 'sup')}
:param function key_fn: key function returning hashable value to group by
:rtype: DataSet
"""
grouper = defaultdict(list)
for ele in self:
grouper[key_fn(ele)].append(ele)
return self.Set(grouper.items())
def to(self, constructor):
return constructor(self)
[docs] def to_dict(self):
""" Converts a stream to a :py:class:`dict`
>>> stream = DataStream(['hi', 'hey', 'yo', 'sup'])
>>> stream.group_by_fn(lambda w: len(w)).to_dict()
... {2: ('hi', 'yo'), 3: ('hey', 'sup')}
:rtype: dict
"""
return dict(self.collect())
[docs] def to_list(self):
""" Converts a stream to a :py:class:`list`
>>> DataStream(range(5)).map(lambda n: n * 5).to_list()
... [0, 5, 10, 15, 20]
:rtype: list
"""
return list(self.collect())
[docs] def to_set(self):
""" Converts a stream to a :py:class:`set`
>>> DataStream([1, 2, 3, 4, 2, 3]).to_set()
... {1, 2, 3, 4}
:rtype: set
"""
return set(self.collect())
[docs] def pipe_to_stdout(self):
""" Pipes stream to stdout using ``sys.stdout.write`` """
map(sys.stdout.write, self)
[docs] def count_frequency(self):
""" Counts frequency of each row in the stream
>>> DataStream(['a', 'a', 'b', 'c']).count_frequency()
... DataSet([('a', 2), ('b', 1), ('c', 1)])
:rtype: DataSet
"""
return self.Set(Counter(self).items())
@staticmethod
def join_objects(left, right):
joined_class = type(left.__class__.__name__ + right.__class__.__name__, (Datum,), {})
attrs = {}
attrs.update(get_object_attrs(right))
attrs.update(get_object_attrs(left))
attrs['left'] = left
attrs['right'] = right
return joined_class(attrs)
[docs] def join(self, how, key, right):
""" Returns a dataset joined using keys from right dataset only
:param str how: ``left``, ``right``, ``outer``, or ``inner``
:param DataStream right: :py:class:`DataStream` to be joined with
:param str key: attribute name to join on
:rtype: DataSet
"""
if how == 'left':
return self.left_join(key, right)
elif how == 'right':
return self.right_join(key, right)
elif how == 'inner':
return self.inner_join(key, right)
elif how == 'outer':
return self.outer_join(key, right)
else:
raise ValueError("Invalid value for how: {}, must be left, right, "
"inner, or outer.".format(str(how)))
[docs] def join_by(self, how, left_key_fn, right_key_fn, right):
""" Uses two key functions perform a join. Key functions should produce
hashable types to be used to compare/index dicts.
:param str how: ``left``, ``right``, ``outer``, or ``inner``
:param DataStream right: :py:class:`DataStream` to be joined with
:param function left_key_fn: key function that produces a hashable value from left stream
:param function right_key_fn: key function that produces a hashable value from right stream
:rtype: DataSet
"""
if how == 'left':
return self.left_join_by(left_key_fn, right_key_fn, right)
elif how == 'right':
return self.right_join_by(left_key_fn, right_key_fn, right)
elif how == 'inner':
return self.inner_join_by(left_key_fn, right_key_fn, right)
elif how == 'outer':
return self.outer_join_by(left_key_fn, right_key_fn, right)
else:
raise ValueError("Invalid value for how: {}, must be left, right, "
"inner, or outer.".format(str(how)))
[docs] def left_join(self, key, right):
""" Returns a dataset joined using keys from right dataset only
:param DataStream right: :py:class:`DataStream` to be joined with
:param str key: attribute name to join on
:rtype: DataSet
"""
key_fn = lambda ele: self.getattr(ele, key)
return self.left_join_by(key_fn, key_fn, right)
[docs] def left_join_by(self, left_key_fn, right_key_fn, right):
""" Returns a dataset joined using key functions to evaluate equality
:param str how: ``left``, ``right``, ``outer``, or ``inner``
:param function left_key_fn: key function that produces a hashable value from left stream
:param function right_key_fn: key function that produces a hashable value from right stream
:param DataStream right: :py:class:`DataStream` to be joined with
:rtype: DataSet
"""
joiner = defaultdict(list)
for ele in right:
joiner[right_key_fn(ele)].append(ele)
joined = []
for ele in self:
for other in joiner.get(left_key_fn(ele), [None]):
joined.append(self.join_objects(ele, other))
return self.Set(joined)
[docs] def right_join(self, key, right):
""" Returns a dataset joined using keys in right dataset only
:param DataStream right: :py:class:`DataStream` to be joined with
:param str key: attribute name to join on
:rtype: DataSet
"""
key_fn = lambda ele: self.getattr(ele, key)
return self.right_join_by(key_fn, key_fn, right)
[docs] def right_join_by(self, left_key_fn, right_key_fn, right):
""" Returns a dataset joined using key functions to evaluate equality
:param function left_key_fn: key function that produces a hashable value from left stream
:param function right_key_fn: key function that produces a hashable value from right stream
:param DataStream right: :py:class:`DataStream` to be joined with
:rtype: DataSet
"""
joiner = defaultdict(list)
for ele in self:
joiner[left_key_fn(ele)].append(ele)
joined = []
for ele in right:
for other in joiner.get(right_key_fn(ele), [None]):
joined.append(self.join_objects(ele, other))
return self.Set(joined)
[docs] def inner_join(self, key, right):
""" Returns a dataset joined using keys in both dataset only
:param DataStream right: :py:class:`DataStream` to be joined with
:param str key: attribute name to join on
:rtype: DataSet
"""
key_fn = lambda ele: self.getattr(ele, key)
return self.inner_join_by(key_fn, key_fn, right)
[docs] def inner_join_by(self, left_key_fn, right_key_fn, right):
""" Returns a dataset joined using key functions to evaluate equality
:param function left_key_fn: key function that produces a hashable value from left stream
:param function right_key_fn: key function that produces a hashable value from right stream
:param DataStream right: :py:class:`DataStream` to be joined with
:rtype: DataSet
"""
joiner = defaultdict(list)
for ele in right:
joiner[right_key_fn(ele)].append(ele)
joined = []
for ele in self:
for other in joiner[left_key_fn(ele)]:
joined.append(self.join_objects(ele, other))
return self.Set(joined)
[docs] def outer_join(self, key, right):
""" Returns a dataset joined using keys in either datasets
:param DataStream right: :py:class:`DataStream` to be joined with
:param str key: attribute name to join on
:rtype: DataSet
"""
key_fn = lambda ele: self.getattr(ele, key)
return self.outer_join_by(key_fn, key_fn, right)
[docs] def outer_join_by(self, left_key_fn, right_key_fn, right):
""" Returns a dataset joined using key functions to evaluate equality
:param function left_key_fn: key function that produces a hashable value from left stream
:param function right_key_fn: key function that produces a hashable value from right stream
:param DataStream right: :py:class:`DataStream` to be joined with
:rtype: DataSet
"""
left_joiner = defaultdict(list)
for ele in self:
left_joiner[left_key_fn(ele)].append(ele)
right_joiner = defaultdict(list)
for ele in right:
right_joiner[right_key_fn(ele)].append(ele)
keys = set(left_joiner.keys()).union(set(right_joiner.keys()))
def iter_join(l, r, join_keys):
for join_key in join_keys:
for ele in l.get(join_key, [None]):
for other in r.get(join_key, [None]):
yield self.join_objects(ele, other)
return self.Set(iter_join(left_joiner, right_joiner, keys))
[docs] def pick_attrs(self, attr_names):
""" Picks attributes from each row in a stream. This is helpful for limiting row attrs to only those you want to save in a database, etc.
>>> Person = namedtuple('Person', ['name', 'year_born'])
>>> DataStream([Person('amy', 1987), Person('brad', 1980)]).pick_attrs(['year_born']).to_list()
... [Datum({'year_born': 1987}), Datum({'year_born': 1980})]
:param list[str] attr_names: list of attribute names to keep
:rtype: DataStream
"""
def attr_filter(row):
return Datum(dict((name, self.getattr(row, name)) for name in attr_names))
return self.map(attr_filter)
[docs] def where(self, name=Nothing):
""" Short hand for common filter functions - ``where`` selects an attribute to be filtered on, with a condition like ``gt`` or ``contains`` following it.
>>> Person = namedtuple('Person', ['name', 'year_born'])
>>> DataStream([Person('amy', 1987), Person('brad', 1980)]).where('year_born').gt(1983).to_list()
... [Person(name='amy', year_born=1987)]
:param str name: attribute name to filter on
:rtype: FilterRadix
"""
return FilterRadix(self, name)
@staticmethod
def getattr(row, name):
if name is Nothing:
return row
return getattr(row, name)
@staticmethod
def hasattr(row, name):
return hasattr(row, name)
@staticmethod
def setattr(row, name, value):
setattr(row, name, value)
@classmethod
[docs] def from_file(cls, path):
""" Stream lines from a file
>>> DataStream.from_file('hamlet.txt').concat_map(str.split).take(7)
... ['The', 'Tragedy', 'of', 'Hamlet,', 'Prince', 'of', 'Denmark']
:param str path: path to file to be streamed
:rtype: DataStream
"""
return cls.Stream(cls.iter_file(path))
@classmethod
def from_files(cls, paths):
return cls.Stream(cls.iter_files(paths))
@staticmethod
def iter_files(paths):
for path in paths:
source_file = open(path)
for line in source_file:
yield line
source_file.close()
raise StopIteration
@staticmethod
def iter_file(path):
source_file = open(path)
for line in source_file:
yield line
source_file.close()
raise StopIteration
@classmethod
[docs] def from_csv(cls, path, headers=None, constructor=Datum):
""" Stream rows from a csv file
>>> DataStream.from_csv('payments.csv').to_list()
... [Datum({'name': 'joe', 'charge': 174.93}), Datum({'name': 'sally', 'charge': 198.05}), ...]
:param str path: path to csv to be streamed
:param list[str] headers: manual names for headers - if present, first row is pulled in as data, if ``None``, first row is used as headers
:param constructor: class or function to construct for each row
:rtype: DataStream
"""
source_file = open(path)
if headers is None:
headers = [h.strip() for h in source_file.readline().split(",")]
reader = cls.iter_csv(source_file)
return cls.Stream(constructor(zip(headers, row)) for row in reader)
@staticmethod
def iter_csv(source_file):
reader = csv.reader(source_file)
for row in reader:
yield row
source_file.close()
raise StopIteration
@classmethod
[docs] def from_stdin(cls):
""" Stream rows from stdin
:rtype: DataStream
"""
return cls.Stream(sys.stdin)
def write_to_file(self, path):
with open(path, 'w') as outfile:
for row in self:
outfile.write(row + os.linesep)
def append_to_file(self, path):
with open(path, 'a') as outfile:
for row in self:
outfile.write(row + os.linesep)
class FilterRadix(object):
def __init__(self, stream, attr_name):
self._source = stream
self.attr_name = attr_name
def eq(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) == value)
def neq(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) != value)
def gt(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) > value)
def gteq(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) >= value)
def lt(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) < value)
def lteq(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) <= value)
def is_in(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) in value)
def not_in(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) not in value)
def has_length(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) == value)
def shorter_than(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) < value)
def longer_than(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) > value)
def truthy(self):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name))
def falsey(self):
name = self.attr_name
return self._source.filter(lambda row: not self._source.getattr(row, name))
def isinstance(self, value):
name = self.attr_name
return self._source.filter(
lambda row: isinstance(self._source.getattr(row, name), value))
def notinstance(self, value):
name = self.attr_name
return self._source.filter(
lambda row: not isinstance(self._source.getattr(row, name), value))
def is_(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) is value)
def is_not(self, value):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name) is not value)
def contains(self, value):
name = self.attr_name
return self._source.filter(lambda row: value in self._source.getattr(row, name))
def doesnt_contain(self, value):
name = self.attr_name
return self._source.filter(lambda row: value not in self._source.getattr(row, name))
def startswith(self, substring):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name).startswith(substring))
def endswith(self, substring):
name = self.attr_name
return self._source.filter(lambda row: self._source.getattr(row, name).endswith(substring))
def len_eq(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) == value)
def len_gt(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) > value)
def len_lt(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) < value)
def len_gteq(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) >= value)
def len_lteq(self, value):
name = self.attr_name
return self._source.filter(lambda row: len(self._source.getattr(row, name)) <= value)
[docs]class DataSet(DataStream):
""" Like a :py:class:`DataStream`, but with the source cached as a list. Able to perform tasks that require the whole source, like sorting and reversing.
"""
def __init__(self, source):
super(DataSet, self).__init__(source)
self._source = list(source)
def __len__(self):
return len(self._source)
def __getitem__(self, item):
return self._source[item]
def __repr__(self):
head, tail = ', '.join(map(str, self[:5])), ', '.join(map(str, self[-5:]))
return "{}([{}, ... {}])".format(self.__class__.__name__, head, tail)
def __str__(self):
return self.__repr__()
def take_now(self, n):
return self.Set([self._source[i] for i in range(n)])
[docs] def apply(self, function):
""" Apply a function to the whole dataset
:param function function: function to be called on the whole dataset
:rtype: DataSet
"""
return self.Set(function(self))
[docs] def call(self, function):
""" Call a function with the whole dataset, returning the original
>>> from pprint import pprint
>>> DataSet([1, 2, 3]).apply(pprint)
... DataSet([1, 2, 3])
... DataSet([1, 2, 3])
:param function function: function to be called on the whole dataset
:rtype: DataSet
"""
function(self)
return self
[docs] def sort_by(self, key_fn, descending=True):
""" Sort the :py:class:`DataSet` using the given key function
>>> Person = namedtuple('Person', ['name', 'year_born'])
>>> DataSet([Person('amy', 1987), Person('brad', 1980)]).sort_by(lambda p: p.year_born)
... DataSet([Datum({'name': 'amy', 'year_born': 1980}), Datum({'name': 'brad', 'year_born': 1987})])
:param function key_fn: function used select the key used to sort the dataset
:param bool descending: sorts descending if ``True``
:rtype: DataSet
"""
return self.Stream(sorted(self._source, key=key_fn, reverse=descending))
[docs] def reverse(self):
""" Reverses a :py:class:`DataSet`
>>> DataSet(range(5)).reverse()
... DataSet([4, 3, 2, 1, 0])
:rtype: DataSet
"""
return self.Stream(element for element in self._source[::-1])
[docs] def to_stream(self):
""" Streams from this dataset
:rtype: DataStream
"""
return self.Stream(iter(self))
@classmethod
def from_csv(cls, path, headers=None, constructor=Datum):
return cls.Set(DataStream.from_csv(path, headers, constructor))
def get_object_attrs(obj):
if hasattr(obj, '__dict__'):
return obj.__dict__
elif hasattr(obj, '__slots__'):
return dict((key, getattr(obj, key)) for key in obj.__slots__)
else:
return {}