1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from itertools import chain, ifilter, imap, product
22 import operator
23 import os
24 import sys
25 import shlex
26 from subprocess import Popen, PIPE
27 from tempfile import NamedTemporaryFile
28 from threading import Thread
29
30 from pyspark import cloudpickle
31 from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
32 read_from_pickle_file
33 from pyspark.join import python_join, python_left_outer_join, \
34 python_right_outer_join, python_cogroup
35 from pyspark.statcounter import StatCounter
36 from pyspark.rddsampler import RDDSampler
37
38 from py4j.java_collections import ListConverter, MapConverter
39
40
41 __all__ = ["RDD"]
42
43
44 -class RDD(object):
45 """
46 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
47 Represents an immutable, partitioned collection of elements that can be
48 operated on in parallel.
49 """
50
52 self._jrdd = jrdd
53 self.is_cached = False
54 self.is_checkpointed = False
55 self.ctx = ctx
56 self._partitionFunc = None
57
58 @property
60 """
61 The L{SparkContext} that this RDD was created on.
62 """
63 return self.ctx
64
66 """
67 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
68 """
69 self.is_cached = True
70 self._jrdd.cache()
71 return self
72
74 """
75 Set this RDD's storage level to persist its values across operations after the first time
76 it is computed. This can only be used to assign a new storage level if the RDD does not
77 have a storage level set yet.
78 """
79 self.is_cached = True
80 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
81 self._jrdd.persist(javaStorageLevel)
82 return self
83
85 """
86 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
87 """
88 self.is_cached = False
89 self._jrdd.unpersist()
90 return self
91
93 """
94 Mark this RDD for checkpointing. It will be saved to a file inside the
95 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
96 all references to its parent RDDs will be removed. This function must
97 be called before any job has been executed on this RDD. It is strongly
98 recommended that this RDD is persisted in memory, otherwise saving it
99 on a file will require recomputation.
100 """
101 self.is_checkpointed = True
102 self._jrdd.rdd().checkpoint()
103
105 """
106 Return whether this RDD has been checkpointed or not
107 """
108 return self._jrdd.rdd().isCheckpointed()
109
111 """
112 Gets the name of the file to which this RDD was checkpointed
113 """
114 checkpointFile = self._jrdd.rdd().getCheckpointFile()
115 if checkpointFile.isDefined():
116 return checkpointFile.get()
117 else:
118 return None
119
120
121
122 - def map(self, f, preservesPartitioning=False):
123 """
124 Return a new RDD containing the distinct elements in this RDD.
125 """
126 def func(split, iterator): return imap(f, iterator)
127 return PipelinedRDD(self, func, preservesPartitioning)
128
129 - def flatMap(self, f, preservesPartitioning=False):
130 """
131 Return a new RDD by first applying a function to all elements of this
132 RDD, and then flattening the results.
133
134 >>> rdd = sc.parallelize([2, 3, 4])
135 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
136 [1, 1, 1, 2, 2, 3]
137 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
138 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
139 """
140 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
141 return self.mapPartitionsWithSplit(func, preservesPartitioning)
142
144 """
145 Return a new RDD by applying a function to each partition of this RDD.
146
147 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
148 >>> def f(iterator): yield sum(iterator)
149 >>> rdd.mapPartitions(f).collect()
150 [3, 7]
151 """
152 def func(s, iterator): return f(iterator)
153 return self.mapPartitionsWithSplit(func)
154
156 """
157 Return a new RDD by applying a function to each partition of this RDD,
158 while tracking the index of the original partition.
159
160 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
161 >>> def f(splitIndex, iterator): yield splitIndex
162 >>> rdd.mapPartitionsWithSplit(f).sum()
163 6
164 """
165 return PipelinedRDD(self, f, preservesPartitioning)
166
168 """
169 Return a new RDD containing only the elements that satisfy a predicate.
170
171 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
172 >>> rdd.filter(lambda x: x % 2 == 0).collect()
173 [2, 4]
174 """
175 def func(iterator): return ifilter(f, iterator)
176 return self.mapPartitions(func)
177
179 """
180 Return a new RDD containing the distinct elements in this RDD.
181
182 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
183 [1, 2, 3]
184 """
185 return self.map(lambda x: (x, None)) \
186 .reduceByKey(lambda x, _: x) \
187 .map(lambda (x, _): x)
188
189 - def sample(self, withReplacement, fraction, seed):
190 """
191 Return a sampled subset of this RDD (relies on numpy and falls back
192 on default random generator if numpy is unavailable).
193
194 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
195 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
196 """
197 return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
198
199
200 - def takeSample(self, withReplacement, num, seed):
201 """
202 Return a fixed-size sampled subset of this RDD (currently requires numpy).
203
204 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
205 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
206 """
207
208 fraction = 0.0
209 total = 0
210 multiplier = 3.0
211 initialCount = self.count()
212 maxSelected = 0
213
214 if (num < 0):
215 raise ValueError
216
217 if initialCount > sys.maxint - 1:
218 maxSelected = sys.maxint - 1
219 else:
220 maxSelected = initialCount
221
222 if num > initialCount and not withReplacement:
223 total = maxSelected
224 fraction = multiplier * (maxSelected + 1) / initialCount
225 else:
226 fraction = multiplier * (num + 1) / initialCount
227 total = num
228
229 samples = self.sample(withReplacement, fraction, seed).collect()
230
231
232
233
234 while len(samples) < total:
235 if seed > sys.maxint - 2:
236 seed = -1
237 seed += 1
238 samples = self.sample(withReplacement, fraction, seed).collect()
239
240 sampler = RDDSampler(withReplacement, fraction, seed+1)
241 sampler.shuffle(samples)
242 return samples[0:total]
243
245 """
246 Return the union of this RDD and another one.
247
248 >>> rdd = sc.parallelize([1, 1, 2, 3])
249 >>> rdd.union(rdd).collect()
250 [1, 1, 2, 3, 1, 1, 2, 3]
251 """
252 return RDD(self._jrdd.union(other._jrdd), self.ctx)
253
255 """
256 Return the union of this RDD and another one.
257
258 >>> rdd = sc.parallelize([1, 1, 2, 3])
259 >>> (rdd + rdd).collect()
260 [1, 1, 2, 3, 1, 1, 2, 3]
261 """
262 if not isinstance(other, RDD):
263 raise TypeError
264 return self.union(other)
265
266
267
269 """
270 Return an RDD created by coalescing all elements within each partition
271 into a list.
272
273 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
274 >>> sorted(rdd.glom().collect())
275 [[1, 2], [3, 4]]
276 """
277 def func(iterator): yield list(iterator)
278 return self.mapPartitions(func)
279
281 """
282 Return the Cartesian product of this RDD and another one, that is, the
283 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
284 C{b} is in C{other}.
285
286 >>> rdd = sc.parallelize([1, 2])
287 >>> sorted(rdd.cartesian(rdd).collect())
288 [(1, 1), (1, 2), (2, 1), (2, 2)]
289 """
290
291 java_cartesian = RDD(self._jrdd.cartesian(other._jrdd), self.ctx)
292 def unpack_batches(pair):
293 (x, y) = pair
294 if type(x) == Batch or type(y) == Batch:
295 xs = x.items if type(x) == Batch else [x]
296 ys = y.items if type(y) == Batch else [y]
297 for pair in product(xs, ys):
298 yield pair
299 else:
300 yield pair
301 return java_cartesian.flatMap(unpack_batches)
302
303 - def groupBy(self, f, numPartitions=None):
304 """
305 Return an RDD of grouped items.
306
307 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
308 >>> result = rdd.groupBy(lambda x: x % 2).collect()
309 >>> sorted([(x, sorted(y)) for (x, y) in result])
310 [(0, [2, 8]), (1, [1, 1, 3, 5])]
311 """
312 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
313
314 - def pipe(self, command, env={}):
315 """
316 Return an RDD created by piping elements to a forked external process.
317
318 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
319 ['1', '2', '3']
320 """
321 def func(iterator):
322 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
323 def pipe_objs(out):
324 for obj in iterator:
325 out.write(str(obj).rstrip('\n') + '\n')
326 out.close()
327 Thread(target=pipe_objs, args=[pipe.stdin]).start()
328 return (x.rstrip('\n') for x in pipe.stdout)
329 return self.mapPartitions(func)
330
332 """
333 Applies a function to all elements of this RDD.
334
335 >>> def f(x): print x
336 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
337 """
338 def processPartition(iterator):
339 for x in iterator:
340 f(x)
341 yield None
342 self.mapPartitions(processPartition).collect()
343
345 """
346 Return a list that contains all of the elements in this RDD.
347 """
348 picklesInJava = self._jrdd.collect().iterator()
349 return list(self._collect_iterator_through_file(picklesInJava))
350
352
353
354
355 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
356 tempFile.close()
357 self.ctx._writeIteratorToPickleFile(iterator, tempFile.name)
358
359 with open(tempFile.name, 'rb') as tempFile:
360 for item in read_from_pickle_file(tempFile):
361 yield item
362 os.unlink(tempFile.name)
363
365 """
366 Reduces the elements of this RDD using the specified commutative and
367 associative binary operator.
368
369 >>> from operator import add
370 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
371 15
372 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
373 10
374 """
375 def func(iterator):
376 acc = None
377 for obj in iterator:
378 if acc is None:
379 acc = obj
380 else:
381 acc = f(obj, acc)
382 if acc is not None:
383 yield acc
384 vals = self.mapPartitions(func).collect()
385 return reduce(f, vals)
386
387 - def fold(self, zeroValue, op):
388 """
389 Aggregate the elements of each partition, and then the results for all
390 the partitions, using a given associative function and a neutral "zero
391 value."
392
393 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
394 as its result value to avoid object allocation; however, it should not
395 modify C{t2}.
396
397 >>> from operator import add
398 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
399 15
400 """
401 def func(iterator):
402 acc = zeroValue
403 for obj in iterator:
404 acc = op(obj, acc)
405 yield acc
406 vals = self.mapPartitions(func).collect()
407 return reduce(op, vals, zeroValue)
408
409
410
412 """
413 Add up the elements in this RDD.
414
415 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
416 6.0
417 """
418 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
419
421 """
422 Return the number of elements in this RDD.
423
424 >>> sc.parallelize([2, 3, 4]).count()
425 3
426 """
427 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
428
430 """
431 Return a L{StatCounter} object that captures the mean, variance
432 and count of the RDD's elements in one operation.
433 """
434 def redFunc(left_counter, right_counter):
435 return left_counter.mergeStats(right_counter)
436
437 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
438
440 """
441 Compute the mean of this RDD's elements.
442
443 >>> sc.parallelize([1, 2, 3]).mean()
444 2.0
445 """
446 return self.stats().mean()
447
449 """
450 Compute the variance of this RDD's elements.
451
452 >>> sc.parallelize([1, 2, 3]).variance()
453 0.666...
454 """
455 return self.stats().variance()
456
458 """
459 Compute the standard deviation of this RDD's elements.
460
461 >>> sc.parallelize([1, 2, 3]).stdev()
462 0.816...
463 """
464 return self.stats().stdev()
465
467 """
468 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
469 estimating the standard deviation by dividing by N-1 instead of N).
470
471 >>> sc.parallelize([1, 2, 3]).sampleStdev()
472 1.0
473 """
474 return self.stats().sampleStdev()
475
477 """
478 Compute the sample variance of this RDD's elements (which corrects for bias in
479 estimating the variance by dividing by N-1 instead of N).
480
481 >>> sc.parallelize([1, 2, 3]).sampleVariance()
482 1.0
483 """
484 return self.stats().sampleVariance()
485
487 """
488 Return the count of each unique value in this RDD as a dictionary of
489 (value, count) pairs.
490
491 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
492 [(1, 2), (2, 3)]
493 """
494 def countPartition(iterator):
495 counts = defaultdict(int)
496 for obj in iterator:
497 counts[obj] += 1
498 yield counts
499 def mergeMaps(m1, m2):
500 for (k, v) in m2.iteritems():
501 m1[k] += v
502 return m1
503 return self.mapPartitions(countPartition).reduce(mergeMaps)
504
505 - def take(self, num):
506 """
507 Take the first num elements of the RDD.
508
509 This currently scans the partitions *one by one*, so it will be slow if
510 a lot of partitions are required. In that case, use L{collect} to get
511 the whole RDD instead.
512
513 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
514 [2, 3]
515 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
516 [2, 3, 4, 5, 6]
517 """
518 def takeUpToNum(iterator):
519 taken = 0
520 while taken < num:
521 yield next(iterator)
522 taken += 1
523
524 mapped = self.mapPartitions(takeUpToNum)
525 items = []
526 for partition in range(mapped._jrdd.splits().size()):
527 iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
528 items.extend(self._collect_iterator_through_file(iterator))
529 if len(items) >= num:
530 break
531 return items[:num]
532
534 """
535 Return the first element in this RDD.
536
537 >>> sc.parallelize([2, 3, 4]).first()
538 2
539 """
540 return self.take(1)[0]
541
542 - def saveAsTextFile(self, path):
543 """
544 Save this RDD as a text file, using string representations of elements.
545
546 >>> tempFile = NamedTemporaryFile(delete=True)
547 >>> tempFile.close()
548 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
549 >>> from fileinput import input
550 >>> from glob import glob
551 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
552 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
553 """
554 def func(split, iterator):
555 return (str(x).encode("utf-8") for x in iterator)
556 keyed = PipelinedRDD(self, func)
557 keyed._bypass_serializer = True
558 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
559
560
561
563 """
564 Return the key-value pairs in this RDD to the master as a dictionary.
565
566 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
567 >>> m[1]
568 2
569 >>> m[3]
570 4
571 """
572 return dict(self.collect())
573
575 """
576 Merge the values for each key using an associative reduce function.
577
578 This will also perform the merging locally on each mapper before
579 sending results to a reducer, similarly to a "combiner" in MapReduce.
580
581 Output will be hash-partitioned with C{numPartitions} partitions, or
582 the default parallelism level if C{numPartitions} is not specified.
583
584 >>> from operator import add
585 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
586 >>> sorted(rdd.reduceByKey(add).collect())
587 [('a', 2), ('b', 1)]
588 """
589 return self.combineByKey(lambda x: x, func, func, numPartitions)
590
592 """
593 Merge the values for each key using an associative reduce function, but
594 return the results immediately to the master as a dictionary.
595
596 This will also perform the merging locally on each mapper before
597 sending results to a reducer, similarly to a "combiner" in MapReduce.
598
599 >>> from operator import add
600 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
601 >>> sorted(rdd.reduceByKeyLocally(add).items())
602 [('a', 2), ('b', 1)]
603 """
604 def reducePartition(iterator):
605 m = {}
606 for (k, v) in iterator:
607 m[k] = v if k not in m else func(m[k], v)
608 yield m
609 def mergeMaps(m1, m2):
610 for (k, v) in m2.iteritems():
611 m1[k] = v if k not in m1 else func(m1[k], v)
612 return m1
613 return self.mapPartitions(reducePartition).reduce(mergeMaps)
614
616 """
617 Count the number of elements for each key, and return the result to the
618 master as a dictionary.
619
620 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
621 >>> sorted(rdd.countByKey().items())
622 [('a', 2), ('b', 1)]
623 """
624 return self.map(lambda x: x[0]).countByValue()
625
626 - def join(self, other, numPartitions=None):
627 """
628 Return an RDD containing all pairs of elements with matching keys in
629 C{self} and C{other}.
630
631 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
632 (k, v1) is in C{self} and (k, v2) is in C{other}.
633
634 Performs a hash join across the cluster.
635
636 >>> x = sc.parallelize([("a", 1), ("b", 4)])
637 >>> y = sc.parallelize([("a", 2), ("a", 3)])
638 >>> sorted(x.join(y).collect())
639 [('a', (1, 2)), ('a', (1, 3))]
640 """
641 return python_join(self, other, numPartitions)
642
644 """
645 Perform a left outer join of C{self} and C{other}.
646
647 For each element (k, v) in C{self}, the resulting RDD will either
648 contain all pairs (k, (v, w)) for w in C{other}, or the pair
649 (k, (v, None)) if no elements in other have key k.
650
651 Hash-partitions the resulting RDD into the given number of partitions.
652
653 >>> x = sc.parallelize([("a", 1), ("b", 4)])
654 >>> y = sc.parallelize([("a", 2)])
655 >>> sorted(x.leftOuterJoin(y).collect())
656 [('a', (1, 2)), ('b', (4, None))]
657 """
658 return python_left_outer_join(self, other, numPartitions)
659
661 """
662 Perform a right outer join of C{self} and C{other}.
663
664 For each element (k, w) in C{other}, the resulting RDD will either
665 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
666 if no elements in C{self} have key k.
667
668 Hash-partitions the resulting RDD into the given number of partitions.
669
670 >>> x = sc.parallelize([("a", 1), ("b", 4)])
671 >>> y = sc.parallelize([("a", 2)])
672 >>> sorted(y.rightOuterJoin(x).collect())
673 [('a', (2, 1)), ('b', (None, 4))]
674 """
675 return python_right_outer_join(self, other, numPartitions)
676
677
678 - def partitionBy(self, numPartitions, partitionFunc=hash):
679 """
680 Return a copy of the RDD partitioned using the specified partitioner.
681
682 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
683 >>> sets = pairs.partitionBy(2).glom().collect()
684 >>> set(sets[0]).intersection(set(sets[1]))
685 set([])
686 """
687 if numPartitions is None:
688 numPartitions = self.ctx.defaultParallelism
689
690
691
692 def add_shuffle_key(split, iterator):
693 buckets = defaultdict(list)
694 for (k, v) in iterator:
695 buckets[partitionFunc(k) % numPartitions].append((k, v))
696 for (split, items) in buckets.iteritems():
697 yield str(split)
698 yield dump_pickle(Batch(items))
699 keyed = PipelinedRDD(self, add_shuffle_key)
700 keyed._bypass_serializer = True
701 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
702 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
703 id(partitionFunc))
704 jrdd = pairRDD.partitionBy(partitioner).values()
705 rdd = RDD(jrdd, self.ctx)
706
707
708 rdd._partitionFunc = partitionFunc
709 return rdd
710
711
712 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
713 numPartitions=None):
714 """
715 Generic function to combine the elements for each key using a custom
716 set of aggregation functions.
717
718 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
719 type" C. Note that V and C can be different -- for example, one might
720 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
721
722 Users provide three functions:
723
724 - C{createCombiner}, which turns a V into a C (e.g., creates
725 a one-element list)
726 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
727 a list)
728 - C{mergeCombiners}, to combine two C's into a single one.
729
730 In addition, users can control the partitioning of the output RDD.
731
732 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
733 >>> def f(x): return x
734 >>> def add(a, b): return a + str(b)
735 >>> sorted(x.combineByKey(str, add, add).collect())
736 [('a', '11'), ('b', '1')]
737 """
738 if numPartitions is None:
739 numPartitions = self.ctx.defaultParallelism
740 def combineLocally(iterator):
741 combiners = {}
742 for (k, v) in iterator:
743 if k not in combiners:
744 combiners[k] = createCombiner(v)
745 else:
746 combiners[k] = mergeValue(combiners[k], v)
747 return combiners.iteritems()
748 locally_combined = self.mapPartitions(combineLocally)
749 shuffled = locally_combined.partitionBy(numPartitions)
750 def _mergeCombiners(iterator):
751 combiners = {}
752 for (k, v) in iterator:
753 if not k in combiners:
754 combiners[k] = v
755 else:
756 combiners[k] = mergeCombiners(combiners[k], v)
757 return combiners.iteritems()
758 return shuffled.mapPartitions(_mergeCombiners)
759
760
762 """
763 Group the values for each key in the RDD into a single sequence.
764 Hash-partitions the resulting RDD with into numPartitions partitions.
765
766 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
767 >>> sorted(x.groupByKey().collect())
768 [('a', [1, 1]), ('b', [1])]
769 """
770
771 def createCombiner(x):
772 return [x]
773
774 def mergeValue(xs, x):
775 xs.append(x)
776 return xs
777
778 def mergeCombiners(a, b):
779 return a + b
780
781 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
782 numPartitions)
783
784
786 """
787 Pass each value in the key-value pair RDD through a flatMap function
788 without changing the keys; this also retains the original RDD's
789 partitioning.
790 """
791 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
792 return self.flatMap(flat_map_fn, preservesPartitioning=True)
793
795 """
796 Pass each value in the key-value pair RDD through a map function
797 without changing the keys; this also retains the original RDD's
798 partitioning.
799 """
800 map_values_fn = lambda (k, v): (k, f(v))
801 return self.map(map_values_fn, preservesPartitioning=True)
802
803
805 """
806 Alias for cogroup.
807 """
808 return self.cogroup(other)
809
810
811 - def cogroup(self, other, numPartitions=None):
812 """
813 For each key k in C{self} or C{other}, return a resulting RDD that
814 contains a tuple with the list of values for that key in C{self} as well
815 as C{other}.
816
817 >>> x = sc.parallelize([("a", 1), ("b", 4)])
818 >>> y = sc.parallelize([("a", 2)])
819 >>> sorted(x.cogroup(y).collect())
820 [('a', ([1], [2])), ('b', ([4], []))]
821 """
822 return python_cogroup(self, other, numPartitions)
823
825 """
826 Return each (key, value) pair in C{self} that has no pair with matching key
827 in C{other}.
828
829 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
830 >>> y = sc.parallelize([("a", 3), ("c", None)])
831 >>> sorted(x.subtractByKey(y).collect())
832 [('b', 4), ('b', 5)]
833 """
834 filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
835 map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
836 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
837
838 - def subtract(self, other, numPartitions=None):
839 """
840 Return each value in C{self} that is not contained in C{other}.
841
842 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
843 >>> y = sc.parallelize([("a", 3), ("c", None)])
844 >>> sorted(x.subtract(y).collect())
845 [('a', 1), ('b', 4), ('b', 5)]
846 """
847 rdd = other.map(lambda x: (x, True))
848 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
849
851 """
852 Creates tuples of the elements in this RDD by applying C{f}.
853
854 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
855 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
856 >>> sorted(x.cogroup(y).collect())
857 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
858 """
859 return self.map(lambda x: (f(x), x))
860
868 """
869 Pipelined maps:
870 >>> rdd = sc.parallelize([1, 2, 3, 4])
871 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
872 [4, 8, 12, 16]
873 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
874 [4, 8, 12, 16]
875
876 Pipelined reduces:
877 >>> from operator import add
878 >>> rdd.map(lambda x: 2 * x).reduce(add)
879 20
880 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
881 20
882 """
883 - def __init__(self, prev, func, preservesPartitioning=False):
884 if isinstance(prev, PipelinedRDD) and prev._is_pipelinable():
885 prev_func = prev.func
886 def pipeline_func(split, iterator):
887 return func(split, prev_func(split, iterator))
888 self.func = pipeline_func
889 self.preservesPartitioning = \
890 prev.preservesPartitioning and preservesPartitioning
891 self._prev_jrdd = prev._prev_jrdd
892 else:
893 self.func = func
894 self.preservesPartitioning = preservesPartitioning
895 self._prev_jrdd = prev._jrdd
896 self.is_cached = False
897 self.is_checkpointed = False
898 self.ctx = prev.ctx
899 self.prev = prev
900 self._jrdd_val = None
901 self._bypass_serializer = False
902
903 @property
905 if self._jrdd_val:
906 return self._jrdd_val
907 func = self.func
908 if not self._bypass_serializer and self.ctx.batchSize != 1:
909 oldfunc = self.func
910 batchSize = self.ctx.batchSize
911 def batched_func(split, iterator):
912 return batched(oldfunc(split, iterator), batchSize)
913 func = batched_func
914 cmds = [func, self._bypass_serializer]
915 pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds)
916 broadcast_vars = ListConverter().convert(
917 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
918 self.ctx._gateway._gateway_client)
919 self.ctx._pickled_broadcast_vars.clear()
920 class_manifest = self._prev_jrdd.classManifest()
921 env = MapConverter().convert(self.ctx.environment,
922 self.ctx._gateway._gateway_client)
923 includes = ListConverter().convert(self.ctx._python_includes,
924 self.ctx._gateway._gateway_client)
925 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
926 pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec,
927 broadcast_vars, self.ctx._javaAccumulator, class_manifest)
928 self._jrdd_val = python_rdd.asJavaRDD()
929 return self._jrdd_val
930
932 return not (self.is_cached or self.is_checkpointed)
933
936 import doctest
937 from pyspark.context import SparkContext
938 globs = globals().copy()
939
940
941 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
942 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
943 globs['sc'].stop()
944 if failure_count:
945 exit(-1)
946
947
948 if __name__ == "__main__":
949 _test()
950