Every line of 'convert csv to dataframe python' code snippets is scanned for vulnerabilities by our powerful machine learning engine that combs millions of open source libraries, ensuring your Python code is secure.
35 def csvToDataFrame(self, sqlCtx, rdd, columns=None, sep=",", parseDate=True): 36 """Converts CSV plain text RDD into SparkSQL DataFrame (former SchemaRDD) 37 using PySpark. If columns not given, assumes first row is the header. 38 If separator not given, assumes comma separated 39 """ 40 if self.py_version < 3: 41 def toRow(line): 42 return self.toRowSep(line.encode('utf-8'), sep) 43 else: 44 def toRow(line): 45 return self.toRowSep(line, sep) 46 47 rdd_array = rdd.map(toRow) 48 rdd_sql = rdd_array 49 50 if columns is None: 51 columns = rdd_array.first() 52 rdd_sql = rdd_array.zipWithIndex().filter( 53 lambda r_i: r_i[1] > 0).keys() 54 column_types = self.evaluateType(rdd_sql, parseDate) 55 56 def toSqlRow(row): 57 return self.toSqlRowWithType(row, column_types) 58 59 schema = self.makeSchema(zip(columns, column_types)) 60 61 return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)
36 def csvToDataFrame(sqlCtx,rdd,columns=None,sep=",",parseDate=True, nSampl=1000): 37 def toRow(line): 38 return toRowSep(line,sep) 39 rdd_array = rdd.map(toRow) 40 rdd_sql = rdd_array 41 if columns is None: 42 columns = rdd_array.first() 43 rdd_sampl = rdd_array.zipWithIndex().filter(lambda (r,i): (i > 0 and ((nSampl == 0) or (i < nSampl)))).keys() 44 rdd_sql = rdd_array.zipWithIndex().filter(lambda (r,i): i > 0).keys() 45 column_types = evaluateType(rdd_sampl,parseDate) 46 def toSqlRow(row): 47 return toSqlRowWithType(row,column_types) 48 schema = makeSchema(zip(columns,column_types)) 49 return sqlCtx.createDataFrame(rdd_sql.map(toSqlRow), schema=schema)
191 def fast_postgresql_to_df(table, schema): 192 engine = table.bind 193 conn = engine.raw_connection() 194 with conn.cursor() as cur: 195 with io.StringIO() as f: 196 table_name = str(table) 197 if not isinstance(table, Table): 198 table_name = '({})'.format(table_name) 199 sql = "COPY {table_name} TO STDOUT WITH (FORMAT CSV, HEADER TRUE)".format( 200 table_name=table_name) 201 cur.copy_expert(sql, f) 202 203 f.seek(0) 204 # reading csv 205 csv_loader = CsvDataStore(schema, f, with_header=True) 206 df = csv_loader.load() 207 #df = pandas.read_csv(f) 208 for col in schema.cols: 209 if isinstance(col, dt): 210 # converting datetime column 211 df[col.name] = pandas.to_datetime(df[col.name], format="%Y-%m-%d %H:%M:%S", coerce=True) 212 if isinstance(col, big_dt): 213 # converting big_dt column 214 strptime = datetime.datetime.strptime 215 parse_func = (lambda x: strptime(x, "%Y-%m-%d %H:%M:%S")) 216 df[col.name] = df[col.name].map(parse_func, na_action='ignore') 217 return df