Package pyspark :: Module broadcast
[frames] | no frames]

Source Code for Module pyspark.broadcast

 1  # 
 2  # Licensed to the Apache Software Foundation (ASF) under one or more 
 3  # contributor license agreements.  See the NOTICE file distributed with 
 4  # this work for additional information regarding copyright ownership. 
 5  # The ASF licenses this file to You under the Apache License, Version 2.0 
 6  # (the "License"); you may not use this file except in compliance with 
 7  # the License.  You may obtain a copy of the License at 
 8  # 
 9  #    http://www.apache.org/licenses/LICENSE-2.0 
10  # 
11  # Unless required by applicable law or agreed to in writing, software 
12  # distributed under the License is distributed on an "AS IS" BASIS, 
13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14  # See the License for the specific language governing permissions and 
15  # limitations under the License. 
16  # 
17   
18  """ 
19  >>> from pyspark.context import SparkContext 
20  >>> sc = SparkContext('local', 'test') 
21  >>> b = sc.broadcast([1, 2, 3, 4, 5]) 
22  >>> b.value 
23  [1, 2, 3, 4, 5] 
24   
25  >>> from pyspark.broadcast import _broadcastRegistry 
26  >>> _broadcastRegistry[b.bid] = b 
27  >>> from cPickle import dumps, loads 
28  >>> loads(dumps(b)).value 
29  [1, 2, 3, 4, 5] 
30   
31  >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() 
32  [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] 
33   
34  >>> large_broadcast = sc.broadcast(list(range(10000))) 
35  """ 
36  # Holds broadcasted data received from Java, keyed by its id. 
37  _broadcastRegistry = {} 
38   
39   
40 -def _from_id(bid):
41 from pyspark.broadcast import _broadcastRegistry 42 if bid not in _broadcastRegistry: 43 raise Exception("Broadcast variable '%s' not loaded!" % bid) 44 return _broadcastRegistry[bid]
45 46
47 -class Broadcast(object):
48 - def __init__(self, bid, value, java_broadcast=None, pickle_registry=None):
49 self.value = value 50 self.bid = bid 51 self._jbroadcast = java_broadcast 52 self._pickle_registry = pickle_registry
53
54 - def __reduce__(self):
55 self._pickle_registry.add(self) 56 return (_from_id, (self.bid, ))
57