Every line of 'pyspark read csv' code snippets is scanned for vulnerabilities by our powerful machine learning engine that combs millions of open source libraries, ensuring your Python code is secure.
37 def loadData(spark,dataFile, dataFileSep=","): 38 #run_logger.log("reading file from ", dataFile) 39 df = spark.read.csv(dataFile, header=False, sep=dataFileSep, inferSchema=True, nanValue="", mode='PERMISSIVE') 40 return df
36 def csvToDataFrame(sqlCtx,rdd,columns=None,sep=",",parseDate=True, nSampl=1000): 37 def toRow(line): 38 return toRowSep(line,sep) 39 rdd_array = rdd.map(toRow) 40 rdd_sql = rdd_array 41 if columns is None: 42 columns = rdd_array.first() 43 rdd_sampl = rdd_array.zipWithIndex().filter(lambda (r,i): (i > 0 and ((nSampl == 0) or (i < nSampl)))).keys() 44 rdd_sql = rdd_array.zipWithIndex().filter(lambda (r,i): i > 0).keys() 45 column_types = evaluateType(rdd_sampl,parseDate) 46 def toSqlRow(row): 47 return toSqlRowWithType(row,column_types) 48 schema = makeSchema(zip(columns,column_types)) 49 return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)
66 def spark_read_from_jdbc(spark, url, user, password, metastore_table, jdbc_table, driver, 67 save_mode, save_format, fetch_size, num_partitions, 68 partition_column, lower_bound, upper_bound): 69 70 # first set common options 71 reader = set_common_options(spark.read, url, jdbc_table, user, password, driver) 72 73 # now set specific read options 74 if fetch_size: 75 reader = reader.option('fetchsize', fetch_size) 76 if num_partitions: 77 reader = reader.option('numPartitions', num_partitions) 78 if partition_column and lower_bound and upper_bound: 79 reader = reader \ 80 .option('partitionColumn', partition_column) \ 81 .option('lowerBound', lower_bound) \ 82 .option('upperBound', upper_bound) 83 84 reader \ 85 .load() \ 86 .write \ 87 .saveAsTable(metastore_table, format=save_format, mode=save_mode)
35 def csvToDataFrame(self, sqlCtx, rdd, columns=None, sep=",", parseDate=True): 36 """Converts CSV plain text RDD into SparkSQL DataFrame (former SchemaRDD) 37 using PySpark. If columns not given, assumes first row is the header. 38 If separator not given, assumes comma separated 39 """ 40 if self.py_version < 3: 41 def toRow(line): 42 return self.toRowSep(line.encode('utf-8'), sep) 43 else: 44 def toRow(line): 45 return self.toRowSep(line, sep) 46 47 rdd_array = rdd.map(toRow) 48 rdd_sql = rdd_array 49 50 if columns is None: 51 columns = rdd_array.first() 52 rdd_sql = rdd_array.zipWithIndex().filter( 53 lambda r_i: r_i[1] > 0).keys() 54 column_types = self.evaluateType(rdd_sql, parseDate) 55 56 def toSqlRow(row): 57 return self.toSqlRowWithType(row, column_types) 58 59 schema = self.makeSchema(zip(columns, column_types)) 60 61 return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)
6 def main(): 7 '''Program entry point''' 8 9 #Intialize a spark context 10 with pyspark.SparkContext("local", "PySparkWordCount") as sc: 11 #Get a RDD containing lines from this script file 12 lines = sc.textFile(__file__) 13 #Split each line into words and assign a frequency of 1 to each word 14 words = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)) 15 #count the frequency for words 16 counts = words.reduceByKey(operator.add) 17 #Sort the counts in descending order based on the word frequency 18 sorted_counts = counts.sortBy(lambda x: x[1], False) 19 #Get an iterator over the counts to print a word and its frequency 20 for word,count in sorted_counts.toLocalIterator(): 21 print(u"{} --> {}".format(word, count))