Skip to content

Commit

Permalink
Merge branch 'development' of github.com:v3io/tutorials
Browse files Browse the repository at this point in the history
  • Loading branch information
Sharon-iguazio committed Sep 15, 2019
2 parents dae664e + 5628fa5 commit 423ea6c
Show file tree
Hide file tree
Showing 4 changed files with 3,034 additions and 538 deletions.
223 changes: 223 additions & 0 deletions .ipynb_checkpoints/importschema-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import v3io_frames as v3f\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Iguazio file access notebook\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+\n",
"|databaseName|\n",
"+------------+\n",
"| default|\n",
"| test|\n",
"+------------+\n",
"\n"
]
}
],
"source": [
"spark.sql(\"show databases\").show()"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+---------+-----------+\n",
"|database|tableName|isTemporary|\n",
"+--------+---------+-----------+\n",
"| test| a| false|\n",
"| test| b| false|\n",
"| test| example| false|\n",
"| test| example2| false|\n",
"+--------+---------+-----------+\n",
"\n"
]
}
],
"source": [
"spark.sql(\"show tables in test\").show()"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"def createTable(tableName, df):\n",
" cols = df.dtypes\n",
" createScript = \"CREATE EXTERNAL TABLE test.\" + tableName + \"(\"\n",
" colArray = []\n",
" for colName, colType in cols:\n",
" colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n",
" createColsScript = \", \".join(colArray )\n",
" \n",
" script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + dataDir + \"'\"\n",
" print(script)\n",
" return script\n",
" \n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"def createTableInUserDirectory(dirName):\n",
" dataDir = \"/User/\" + dirName + \"/\"\n",
" print(dataDir)\n",
" #dataDir = \"/User/v3io/bigdata/User/sample\"\n",
" df = spark.read.parquet(dataDir)\n",
" sqlScript = createTable(dirName, df)\n",
" spark.sql(sqlScript)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: bigint, diagnosis: string, radius_mean: double, texture_mean: double, perimeter_mean: double, area_mean: double, smoothness_mean: double, compactness_mean: double, concavity_mean: double, concave_points_mean: double, symmetry_mean: double, fractal_dimension_mean: double, radius_se: double, texture_se: double, perimeter_se: double, area_se: double, smoothness_se: double, compactness_se: double, concavity_se: double, concave_points_se: double, symmetry_se: double, fractal_dimension_se: double, radius_worst: double, texture_worst: double, perimeter_worst: double, area_worst: double, smoothness_worst: double, compactness_worst: double, concavity_worst: double, concave_points_worst: double, symmetry_worst: double, fractal_dimension_worst: double]"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark.sql(\"select * from test.example2\")"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"/User/examples/\n"
]
},
{
"ename": "AnalysisException",
"evalue": "'Unable to infer schema for Parquet. It must be specified manually.;'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o207.parquet.\n: org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;\n\tat org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)\n\tat org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:206)\n\tat org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)\n\tat org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)\n\tat org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n",
"\nDuring handling of the above exception, another exception occurred:\n",
"\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-45-4faf31fd9a31>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mdir\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlistdir\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"/User/v3io/bigdata/User\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mdir\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\".\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mcreateTableInUserDirectory\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m<ipython-input-44-db195e8dde24>\u001b[0m in \u001b[0;36mcreateTableInUserDirectory\u001b[0;34m(dirName)\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataDir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;31m#dataDir = \"/User/v3io/bigdata/User/sample\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataDir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 6\u001b[0m \u001b[0msqlScript\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcreateTable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdirName\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlScript\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/spark/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mparquet\u001b[0;34m(self, *paths)\u001b[0m\n\u001b[1;32m 301\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'name'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'string'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'year'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'month'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'day'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m \"\"\"\n\u001b[0;32m--> 303\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_to_seq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpaths\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 304\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 305\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0mignore_unicode_prefix\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 67\u001b[0m e.java_exception.getStackTrace()))\n\u001b[1;32m 68\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.AnalysisException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 69\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 70\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.catalyst.analysis'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mAnalysisException\u001b[0m: 'Unable to infer schema for Parquet. It must be specified manually.;'"
]
}
],
"source": [
"#currentDir = os.getcwd()\n",
"#print(currentDir)\n",
"#dirs = os.walk(\"/User\")\n",
"for dir in os.listdir(\"/User/v3io/bigdata/User\"):\n",
" if not dir.startswith(\".\") :\n",
" createTableInUserDirectory(dir)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['.ipynb_checkpoints', 'examples', 'sample']"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"os.listdir(\"/User/v3io/bigdata/User\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 423ea6c

Please sign in to comment.