From 4fa90105b0e153585054f40df0b5833e0e2ebabb Mon Sep 17 00:00:00 2001 From: Dmitry Pavlov Date: Tue, 10 Sep 2019 18:15:32 +0000 Subject: [PATCH 1/7] ... --- .../importschema-checkpoint.ipynb | 223 ++++++++++++++++++ test | 0 2 files changed, 223 insertions(+) create mode 100644 .ipynb_checkpoints/importschema-checkpoint.ipynb create mode 100644 test diff --git a/.ipynb_checkpoints/importschema-checkpoint.ipynb b/.ipynb_checkpoints/importschema-checkpoint.ipynb new file mode 100644 index 00000000..b9293796 --- /dev/null +++ b/.ipynb_checkpoints/importschema-checkpoint.ipynb @@ -0,0 +1,223 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import v3io_frames as v3f\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"Iguazio file access notebook\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------------+\n", + "|databaseName|\n", + "+------------+\n", + "| default|\n", + "| test|\n", + "+------------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"show databases\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------+---------+-----------+\n", + "|database|tableName|isTemporary|\n", + "+--------+---------+-----------+\n", + "| test| a| false|\n", + "| test| b| false|\n", + "| test| example| false|\n", + "| test| example2| false|\n", + "+--------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "spark.sql(\"show tables in test\").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "\n", + "def createTable(tableName, df):\n", + " cols = df.dtypes\n", + " createScript = \"CREATE EXTERNAL TABLE test.\" + tableName + \"(\"\n", + " colArray = []\n", + " for colName, colType in cols:\n", + " colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n", + " createColsScript = \", \".join(colArray )\n", + " \n", + " script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + dataDir + \"'\"\n", + " print(script)\n", + " return script\n", + " \n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "metadata": {}, + "outputs": [], + "source": [ + "def createTableInUserDirectory(dirName):\n", + " dataDir = \"/User/\" + dirName + \"/\"\n", + " print(dataDir)\n", + " #dataDir = \"/User/v3io/bigdata/User/sample\"\n", + " df = spark.read.parquet(dataDir)\n", + " sqlScript = createTable(dirName, df)\n", + " spark.sql(sqlScript)" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[id: bigint, diagnosis: string, radius_mean: double, texture_mean: double, perimeter_mean: double, area_mean: double, smoothness_mean: double, compactness_mean: double, concavity_mean: double, concave_points_mean: double, symmetry_mean: double, fractal_dimension_mean: double, radius_se: double, texture_se: double, perimeter_se: double, area_se: double, smoothness_se: double, compactness_se: double, concavity_se: double, concave_points_se: double, symmetry_se: double, fractal_dimension_se: double, radius_worst: double, texture_worst: double, perimeter_worst: double, area_worst: double, smoothness_worst: double, compactness_worst: double, concavity_worst: double, concave_points_worst: double, symmetry_worst: double, fractal_dimension_worst: double]" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.sql(\"select * from test.example2\")" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/User/examples/\n" + ] + }, + { + "ename": "AnalysisException", + "evalue": "'Unable to infer schema for Parquet. It must be specified manually.;'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o207.parquet.\n: org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;\n\tat org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)\n\tat org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:207)\n\tat scala.Option.getOrElse(Option.scala:121)\n\tat org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:206)\n\tat org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)\n\tat org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)\n\tat org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:622)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\n", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mdir\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlistdir\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"/User/v3io/bigdata/User\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mdir\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\".\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mcreateTableInUserDirectory\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m\u001b[0m in \u001b[0;36mcreateTableInUserDirectory\u001b[0;34m(dirName)\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataDir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;31m#dataDir = \"/User/v3io/bigdata/User/sample\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataDir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 6\u001b[0m \u001b[0msqlScript\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcreateTable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdirName\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlScript\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mparquet\u001b[0;34m(self, *paths)\u001b[0m\n\u001b[1;32m 301\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'name'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'string'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'year'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'month'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m'day'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'int'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m \"\"\"\n\u001b[0;32m--> 303\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mparquet\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_to_seq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpaths\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 304\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 305\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0mignore_unicode_prefix\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 67\u001b[0m e.java_exception.getStackTrace()))\n\u001b[1;32m 68\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.AnalysisException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 69\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 70\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.catalyst.analysis'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mAnalysisException\u001b[0m: 'Unable to infer schema for Parquet. It must be specified manually.;'" + ] + } + ], + "source": [ + "#currentDir = os.getcwd()\n", + "#print(currentDir)\n", + "#dirs = os.walk(\"/User\")\n", + "for dir in os.listdir(\"/User/v3io/bigdata/User\"):\n", + " if not dir.startswith(\".\") :\n", + " createTableInUserDirectory(dir)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['.ipynb_checkpoints', 'examples', 'sample']" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "os.listdir(\"/User/v3io/bigdata/User\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/test b/test new file mode 100644 index 00000000..e69de29b From 59fa05c9e8695f1e9cf7b85b30899d09c844738c Mon Sep 17 00:00:00 2001 From: Dmitry Pavlov Date: Fri, 13 Sep 2019 19:01:37 +0000 Subject: [PATCH 2/7] finished notebook --- getting-started/importschema.ipynb | 204 +++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 getting-started/importschema.ipynb diff --git a/getting-started/importschema.ipynb b/getting-started/importschema.ipynb new file mode 100644 index 00000000..790019e3 --- /dev/null +++ b/getting-started/importschema.ipynb @@ -0,0 +1,204 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# This notebook is to help automatically import parquet schema to hive" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Below is import of all needed dependencies. And in this sell you should pass path where parquet files located. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/examples/parquet_example\n" + ] + } + ], + "source": [ + "# import pandas as pd\n", + "# import v3io_frames as v3f\n", + "import os\n", + "\n", + "# Set path where parquet files located. They can be nested in folder. \n", + "filepath = os.path.join('/examples/parquet_example')\n", + "print(filepath)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here is creating of spark context with hive support." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"Import parquet schema to hive\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "def getCreateTableScript(databaseName, tableName, df):\n", + " cols = df.dtypes\n", + " createScript = \"CREATE EXTERNAL TABLE \" + databaseName + \".\" + tableName + \"(\"\n", + " colArray = []\n", + " for colName, colType in cols:\n", + " colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n", + " createColsScript = \", \".join(colArray )\n", + " \n", + " script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + tableName + \"'\"\n", + " print(script)\n", + " return script\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "#define main function for creating table where arqument 'path' is path to parquet files \n", + "def createTable(databaseName, tableName, path): \n", + " df = spark.read.parquet(path)\n", + " sqlScript = getCreateTableScript(databaseName, tableName, df)\n", + " spark.sql(sqlScript)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Here is an example where you provide a path to a folder with parquet files and their are uploaded\n", + "\n", + "Write here name of database. Database will create if it doesn't exist.\n", + "In this sell code goes over all files and dirs in provided path and using them for creating table.\n", + "File should be ended with .parquet format\n", + "Directory (in which stored parquet files) should be started with \".\"" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "ename": "AnalysisException", + "evalue": "'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o40.sql.\n: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.doCreateDatabase(HiveExternalCatalog.scala:163)\n\tat org.apache.spark.sql.catalyst.catalog.ExternalCatalog.createDatabase(ExternalCatalog.scala:69)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:207)\n\tat org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)\n\tat org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)\n\tat org.apache.spark.sql.Dataset.(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)\n\tat org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima)\n\tat org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:312)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:302)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)\n\t... 26 more\nCaused by: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:14412)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:14380)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:14314)\n\tat org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:625)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:612)\n\tat org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)\n\tat com.sun.proxy.$Proxy13.createDatabase(Unknown Source)\n\tat org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)\n\t... 38 more\n", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mdatabaseName\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m\"dima\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"CREATE DATABASE IF NOT EXISTS \"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mdatabaseName\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mfileOrDir\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlistdir\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfilepath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/pyspark/sql/session.py\u001b[0m in \u001b[0;36msql\u001b[0;34m(self, sqlQuery)\u001b[0m\n\u001b[1;32m 708\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row1'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row2'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row3'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 709\u001b[0m \"\"\"\n\u001b[0;32m--> 710\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mDataFrame\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsparkSession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlQuery\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_wrapped\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 711\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 712\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0msince\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2.0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 67\u001b[0m e.java_exception.getStackTrace()))\n\u001b[1;32m 68\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.AnalysisException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 69\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 70\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.catalyst.analysis'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31mAnalysisException\u001b[0m: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);'" + ] + } + ], + "source": [ + "databaseName = \"hive\"\n", + "spark.sql(\"CREATE DATABASE IF NOT EXISTS \" + databaseName)\n", + "\n", + "\n", + "for fileOrDir in os.listdir(filepath):\n", + " if fileOrDir.endswith(\".parquet\") :\n", + " createTable(databaseName, fileOrDir.split(\".parquet\")[0], filepath + fileOrDir)\n", + " elif not fileOrDir.startswith(\".\") :\n", + " createTable(databaseName, fileOrDir, filepath + fileOrDir + \"/*\")\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Test how it works" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# test how the tables were saved\n", + "spark.sql(\"show tables in \" + databaseName).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# test how saving to table works\n", + "tableName = \"example1\"\n", + "spark.sql(\"select * from \" + databaseName + \".\" + tableName)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 8a20d03e6dbada1525fa0361b439bf050ece9998 Mon Sep 17 00:00:00 2001 From: Dmitry Pavlov Date: Fri, 13 Sep 2019 19:06:06 +0000 Subject: [PATCH 3/7] removed useless code --- test | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 test diff --git a/test b/test deleted file mode 100644 index e69de29b..00000000 From 8413fa19f159aff58fee637b2825eacc87e06ec9 Mon Sep 17 00:00:00 2001 From: Dmitry Pavlov Date: Fri, 13 Sep 2019 18:47:02 -0400 Subject: [PATCH 4/7] fixing commit --- getting-started/importschema.ipynb | 212 ------------------------ getting-started/parquettohive.ipynb | 246 ++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 212 deletions(-) delete mode 100644 getting-started/importschema.ipynb create mode 100644 getting-started/parquettohive.ipynb diff --git a/getting-started/importschema.ipynb b/getting-started/importschema.ipynb deleted file mode 100644 index ea1d2fda..00000000 --- a/getting-started/importschema.ipynb +++ /dev/null @@ -1,212 +0,0 @@ -{ - "cells": [ - { - - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# This notebook is to help automatically import parquet schema to hive" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Below is import of all needed dependencies. And in this sell you should pass path where parquet files located. " - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "/examples/parquet_example\n" - ] - } - ], - "source": [ - "# import pandas as pd\n", - "# import v3io_frames as v3f\n", - "import os\n", - "\n", - "# Set path where parquet files located. They can be nested in folder. \n", - "filepath = os.path.join('/examples/parquet_example')\n", - "print(filepath)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here is creating of spark context with hive support." - - ] - }, - { - "cell_type": "code", - - "execution_count": 2, - - "metadata": {}, - "outputs": [], - "source": [ - "from pyspark.sql import SparkSession\n", - "spark = SparkSession.builder.appName(\"Import parquet schema to hive\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()" - ] - }, - { - - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "def getCreateTableScript(databaseName, tableName, df):\n", - - " cols = df.dtypes\n", - " createScript = \"CREATE EXTERNAL TABLE \" + databaseName + \".\" + tableName + \"(\"\n", - " colArray = []\n", - " for colName, colType in cols:\n", - " colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n", - " createColsScript = \", \".join(colArray )\n", - " \n", - " script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + tableName + \"'\"\n", - " print(script)\n", - " return script\n", - " " - ] - }, - { - "cell_type": "code", - - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "#define main function for creating table where arqument 'path' is path to parquet files \n", - "def createTable(databaseName, tableName, path): \n", - " df = spark.read.parquet(path)\n", - " sqlScript = getCreateTableScript(databaseName, tableName, df)\n", - - " spark.sql(sqlScript)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Here is an example where you provide a path to a folder with parquet files and their are uploaded\n", - "\n", - "Write here name of database. Database will create if it doesn't exist.\n", - "In this sell code goes over all files and dirs in provided path and using them for creating table.\n", - "File should be ended with .parquet format\n", - "Directory (in which stored parquet files) should be started with \".\"" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [ - { - "ename": "AnalysisException", - "evalue": "'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", - "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 62\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 64\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 327\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m 329\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o40.sql.\n: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.doCreateDatabase(HiveExternalCatalog.scala:163)\n\tat org.apache.spark.sql.catalyst.catalog.ExternalCatalog.createDatabase(ExternalCatalog.scala:69)\n\tat org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:207)\n\tat org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)\n\tat org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)\n\tat org.apache.spark.sql.Dataset.(Dataset.scala:190)\n\tat org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)\n\tat org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima)\n\tat org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:312)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:303)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)\n\tat org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:302)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$doCreateDatabase$1.apply(HiveExternalCatalog.scala:164)\n\tat org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)\n\t... 26 more\nCaused by: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:14412)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:14380)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:14314)\n\tat org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:625)\n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:612)\n\tat org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)\n\tat com.sun.proxy.$Proxy13.createDatabase(Unknown Source)\n\tat org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)\n\t... 38 more\n", - "\nDuring handling of the above exception, another exception occurred:\n", - "\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)", - "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0mdatabaseName\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m\"dima\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"CREATE DATABASE IF NOT EXISTS \"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mdatabaseName\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mfileOrDir\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlistdir\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfilepath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;32m/spark/python/pyspark/sql/session.py\u001b[0m in \u001b[0;36msql\u001b[0;34m(self, sqlQuery)\u001b[0m\n\u001b[1;32m 708\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row1'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row2'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row3'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 709\u001b[0m \"\"\"\n\u001b[0;32m--> 710\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mDataFrame\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsparkSession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlQuery\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_wrapped\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 711\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 712\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0msince\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2.0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;32m/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1255\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1256\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1257\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1258\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1259\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;32m/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 67\u001b[0m e.java_exception.getStackTrace()))\n\u001b[1;32m 68\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.AnalysisException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 69\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 70\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'org.apache.spark.sql.catalyst.analysis'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 71\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mAnalysisException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;31mAnalysisException\u001b[0m: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/User/spark-warehouse/dima.db, failed to create database dima);'" - ] - } - ], - "source": [ - "databaseName = \"hive\"\n", - "spark.sql(\"CREATE DATABASE IF NOT EXISTS \" + databaseName)\n", - "\n", - "\n", - "for fileOrDir in os.listdir(filepath):\n", - " if fileOrDir.endswith(\".parquet\") :\n", - " createTable(databaseName, fileOrDir.split(\".parquet\")[0], filepath + fileOrDir)\n", - " elif not fileOrDir.startswith(\".\") :\n", - " createTable(databaseName, fileOrDir, filepath + fileOrDir + \"/*\")\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Test how it works" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# test how the tables were saved\n", - "spark.sql(\"show tables in \" + databaseName).show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# test how saving to table works\n", - "tableName = \"example1\"\n", - "spark.sql(\"select * from \" + databaseName + \".\" + tableName)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.8" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/getting-started/parquettohive.ipynb b/getting-started/parquettohive.ipynb new file mode 100644 index 00000000..2aa418d9 --- /dev/null +++ b/getting-started/parquettohive.ipynb @@ -0,0 +1,246 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# This notebook is to help automatically import parquet schema to hive" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Below is import of all needed dependencies. And in this sell you should pass path where parquet files located. " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import os" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here is creating of spark context with hive support." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"Import parquet schema to hive\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "def getCreateTableScript(databaseName, tableName, df):\n", + " cols = df.dtypes\n", + " createScript = \"CREATE EXTERNAL TABLE \" + databaseName + \".\" + tableName + \"(\"\n", + " colArray = []\n", + " for colName, colType in cols:\n", + " colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n", + " createColsScript = \", \".join(colArray )\n", + " \n", + " script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + tableName + \"'\"\n", + " print(script)\n", + " return script\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "#define main function for creating table where arqument 'path' is path to parquet files \n", + "def createTable(databaseName, tableName, path): \n", + " df = spark.read.parquet(path)\n", + " sqlScript = getCreateTableScript(databaseName, tableName, df)\n", + " spark.sql(sqlScript)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## One file example" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CREATE EXTERNAL TABLE test.table_from_single_file(id bigint, diagnosis string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) STORED AS PARQUET LOCATION 'table_from_single_file'\n" + ] + } + ], + "source": [ + "# Set path where concrete parquet file located.\n", + "my_parqute_file_path = os.path.join('v3io://bigdata/examples/example1.parquet')\n", + "createTable(\"test\",\"table_from_single_file\",my_parqute_file_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## One folder example" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CREATE EXTERNAL TABLE test.table_from_dir(id bigint, diagnosis string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) STORED AS PARQUET LOCATION 'table_from_dir'\n" + ] + } + ], + "source": [ + "# Set path where parquet folder with parquet files inside located.\n", + "folder_path = os.path.join('v3io://users/admin/examples/parquet_examples/dir1/*')\n", + "createTable(\"test\",\"table_from_dir\",folder_path)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Multiple files and folders example\n", + "\n", + "Write here name of database and path to folder where all parquet files (or folders with them) located. Database should be created.\n", + "In this sell code goes over all files and dirs in provided path and using them for creating table.\n", + "File should be ended with .parquet format\n", + "Directory (in which stored parquet files) should be started with \".\"\n", + "Name of directory or file will be name of table." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CREATE EXTERNAL TABLE test.dir1(id bigint, diagnosis string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) STORED AS PARQUET LOCATION 'dir1'\n", + "CREATE EXTERNAL TABLE test.example1(id bigint, diagnosis string, radius_mean double, texture_mean double, perimeter_mean double, area_mean double, smoothness_mean double, compactness_mean double, concavity_mean double, concave_points_mean double, symmetry_mean double, fractal_dimension_mean double, radius_se double, texture_se double, perimeter_se double, area_se double, smoothness_se double, compactness_se double, concavity_se double, concave_points_se double, symmetry_se double, fractal_dimension_se double, radius_worst double, texture_worst double, perimeter_worst double, area_worst double, smoothness_worst double, compactness_worst double, concavity_worst double, concave_points_worst double, symmetry_worst double, fractal_dimension_worst double) STORED AS PARQUET LOCATION 'example1'\n" + ] + } + ], + "source": [ + "databaseName = \"test\"\n", + "filepath = \"/v3io/users/admin/examples/parquet_examples\"\n", + "\n", + "for fileOrDir in os.listdir(filepath):\n", + " if fileOrDir.endswith(\".parquet\") :\n", + " createTable(databaseName, fileOrDir.split(\".parquet\")[0], filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir)\n", + " elif not fileOrDir.startswith(\".\") :\n", + " createTable(databaseName, fileOrDir, filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir + \"/*\")\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Test how it works" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------------+\n", + "|databaseName|\n", + "+------------+\n", + "| default|\n", + "| test|\n", + "+------------+\n", + "\n", + "+--------+---------+-----------+\n", + "|database|tableName|isTemporary|\n", + "+--------+---------+-----------+\n", + "+--------+---------+-----------+\n", + "\n" + ] + } + ], + "source": [ + "# test how the tables were saved\n", + "#spark.sql(\"drop database test CASCADE\")\n", + "spark.sql(\"drop table \" + databaseName + \".example1\")\n", + "spark.sql(\"show databases\").show()\n", + "spark.sql(\"show tables in \" + databaseName).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# test how saving to table works\n", + "tableName = \"example1\"\n", + "spark.sql(\"select * from \" + databaseName + \".\" + tableName)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From dd020acd97b98ed530064f06b7b2a6582c1045e9 Mon Sep 17 00:00:00 2001 From: Sharon Lifshitz Date: Sun, 15 Sep 2019 18:08:01 +0300 Subject: [PATCH 5/7] [DOC] Frames GS MB doc review (#1) (v2.3.0 outputs) [IG-12272 IG-12092] --- getting-started/frames.ipynb | 788 +++++++++++++++++++++++------------ 1 file changed, 515 insertions(+), 273 deletions(-) diff --git a/getting-started/frames.ipynb b/getting-started/frames.ipynb index 71f03f7f..e0097fb7 100644 --- a/getting-started/frames.ipynb +++ b/getting-started/frames.ipynb @@ -4,28 +4,60 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Using Iguazio Frames Library for High-Performance Data Access \n", - "iguazio `v3io_frames` is a streaming oriented multi-model (generic) data API which allow high-speed data loading and storing
\n", - "frames currently support iguazio key/value, time-series, and streaming data models (called backends), additional backends will be added.\n", + "# Using the V3IO Frames Library for High-Performance Data Access \n", "\n", - "For detailed description of the Frames API go to https://github.com/v3io/frames\n", + "- [Overview](#frames-overview)\n", + "- [Initialization](#frames-init)\n", + "- [Working with NoSQL Tables (\"kv\" Backend)](#frames-kv)\n", + "- [Working with Time-Series Databases (\"tsdb\" Backend)](#frames-tsdb)\n", + "- [Working with Streams (\"stream\" Backend)](#frames-stream)\n", + "- [Cleanup](#frames-cleanup)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "## Overview\n", "\n", - "to use frames you first create a `client` and provide it the session and credential details, the client is used to for 5 basic operations:\n", + "[V3IO Frames](https://github.com/v3io/frames) (**\"Frames\"**) is a multi-model open-source data-access library, developed by Iguazio, which provides a unified high-performance DataFrame API for loading, storing, and accessing data in the data store of the Iguazio Data Science Platform (**\"the platform**).\n", + "Frames currently supports the NoSQL (key/value), stream, and time-series (TSDB) data models via its \"kv\", \"stream\", and \"tsdb\" backends.\n", + "\n", + "To use Frames, you first need to import the **v3io_frames** library and create and initialize a client object — an instance of the`Client` class.
\n", + "The `Client` class features the following object methods for supporting basic data operations:\n", + "\n", + "- `create` — create a new NoSQL or TSDB table or a stream (\"the backend\").\n", + "- `delete` — delete the backend.\n", + "- `read` — read data from the backend (as a pandas DataFrame or DataFrame iterator).\n", + "- `write` — write one or more DataFrames to the backend.\n", + "- `execute` — execute a command on the backend. Each backend may support multiple commands.\n", + "\n", + "\n", + "For a detailed description of the Frames API, see the [Frames documentation](https://github.com/v3io/frames/blob/development/README.md).
\n", + "For more help and usage details, use the internal API help — `.?` in Jupyter Notebook or `print(..__doc__)`.
\n", + "For example, the following command returns information about the read operation for a client object named `client`:\n", "```\n", - " create - create a new time-series table or a stream \n", - " delete - delete the table or stream\n", - " read - read data from the backend (as pandas DataFrame or dataFrame iterator)\n", - " write - write one or more DataFrames into the backend\n", - " execute - execute a command on the backend, each backend may support multiple commands \n", - "``` \n", + "client.read?\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "## Initialization\n", "\n", - "Content:\n", - "- [Working with key/value and SQL data](kv)\n", - "- [Working with Time-series data](#tsdb)\n", - "- [Working with Streams](#stream)\n", + "To use V3IO Frames, first ensure that your platform tenant has a shared tenant-wide instance of the V3IO Frames service.\n", + "This can be done by a platform service administrator from the **Services** dashboard page.
\n", + "Then, import the required libraries and create a Frames client object (an instance of the `Client` class), as demonstrated in the following code, which creates a client object named `client`.\n", "\n", - "The following sections describe how to use frames, for more help and details use the internal documentation, e.g. run the following command\n", - "``` client.read?```\n" + "> **Note:**\n", + "> - The client constructor's `container` parameter is set to `\"users\"` for accessing data in the platform's \"users\" data container.\n", + "> - Because no authentication credentials are passed to the constructor, Frames will use the access token that's assigned to the `V3IO_ACCESS_KEY` environment variable.\n", + "> The platform's Jupyter Notebook service defines this variable automatically and initializes it to an access token for the running user of the service.\n", + "> You can pass different credentials by using the constructor's `token` parameter (platform access token) or `user` and `password` parameters (platform username and password)." ] }, { @@ -37,23 +69,72 @@ "import pandas as pd\n", "import v3io_frames as v3f\n", "import os\n", - "client = v3f.Client('framesd:8081', container='users')" + "\n", + "# Create a Frames client\n", + "client = v3f.Client(\"framesd:8081\", container=\"users\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "## Working with NoSQL Tables (\"kv\" Backend)\n", + "\n", + "This section demonstrates how to use the `\"kv\"` Frames backend to write and read NoSQL data in the platform.\n", + "\n", + "- [Initialization](#frames-kv-init)\n", + "- [Load Data from Amazon S3](frames-kv-load-data-s3)\n", + "- [Write to a NoSQL Table](#frames-kv-write)\n", + "- [Read from the Table Using an SQL Query](#frames-kv-read-sql-query)\n", + "- [Read from the Table Using the Frames API](#frames-kv-read-frames-api)\n", + " - [Read Using a Single DataFrame](#frames-kv-read-frames-api-single-df)\n", + " - [Read Using a DataFrame Iterator (Streaming)](#frames-kv-read-frames-api-df-iterator)\n", + "- [Write Data Using an Update Expression](#frames-kv-write-update-expression)\n", + " - [Use the Write Method to Perform a Batch Update](#frames-kv-write-expression-batch-update)\n", + " - [Use the Update Method's Execute Command to Update a Single Item](#frames-kv-write-expression--singe-item-update-w-execute-update-cmd)\n", + "- [Delete the NoSQL Table](#frames-kv-delete)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "\n", - "## Working with key/value and SQL data\n", + "\n", + "### Initialization\n", "\n", - "### Load data from Amazon S3" + "Start out by defining table-path variables that will be used in the tutorial's code examples.
\n", + "The table path (`table`) is relative to the configured parent data container; see [Write to a NoSQL Table](#frames-kv-write)." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, + "outputs": [], + "source": [ + "# Relative path to the NoSQL table within the parent platform data container\n", + "table = os.path.join(os.getenv(\"V3IO_USERNAME\") + \"/examples/bank\")\n", + "\n", + "# Full path to the NoSQL table for SQL queries (platform Presto data-path syntax);\n", + "# use the same data container as used for the Frames client (\"users\")\n", + "sql_table_path = 'v3io.users.\"' + table + '\"'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "### Load Data from Amazon S3\n", + "\n", + "Read a file from an Amazon Simple Storage (S3) bucket into a Frames pandas DataFrame." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, "outputs": [ { "data": { @@ -216,15 +297,14 @@ "4 unknown 5 may 226 1 -1 0 unknown no " ] }, - "execution_count": 2, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "# read S3 file into a data frame and show its data & metadata\n", - "tablename = os.path.join(os.getenv('V3IO_USERNAME')+'/examples/bank')\n", - "df = pd.read_csv('https://s3.amazonaws.com/iguazio-sample-data/bank.csv', sep=';')\n", + "# Read an AWS S3 file into a DataFrame and show its data and metadata\n", + "df = pd.read_csv(\"https://s3.amazonaws.com/iguazio-sample-data/bank.csv\", sep=\";\")\n", "df.head()" ] }, @@ -232,33 +312,42 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Write data frames into the database using a single command\n", - "data is streamed into the database via fast NoSQL APIs, note the backend is `kv`
\n", - "the input data can be a single dataframe or a dataframe iterator (for streaming)" + "\n", + "### Write to a NoSQL Table\n", + "\n", + "Use the `write` method of the Frames client with the `\"kv\"` backend to write the data that was read in the previous step to a NoSQL table.
\n", + "The mandatory `table` parameter specifies the relative table path within the data container that was configured for the Frames client (see the [main initialization](#frames-init) step).\n", + "In the following example, the relative table path is set by using the `table` variable that was defined in the [\"kv\" backend initialization](#frames-kv-init) step.
\n", + "The `dfs` parameter can be set either to a single DataFrame (as done in the following example) or to multiple DataFrames — either as a DataFrame iterator or as a list of DataFrames." ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ - "out = client.write('kv', tablename, df)" + "out = client.write(\"kv\", table=table, dfs=df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Read from the Database with DB side SQL\n", - "offload data filtering, grouping, joins, etc to a scale-out high speed DB engine
\n", - "Note that we're using a V3IO_USERNAME as environment variable as therefore we need to define the string for the \"From\" section
\n", - "The from convention is select * from v3io..\"path\"" + "\n", + "### Read from the Table Using an SQL Query\n", + "\n", + "You can run SQL queries on your NoSQL table (using Presto) to offload data filtering, grouping, joins, etc. to a scale-out high-speed database engine.\n", + "\n", + "> **Note:** To query a table in a platform data container, the table path in the `from` section of the SQL query should be of the format `v3io..\"/path/to/table\"`.\n", + "> See [Presto Data Paths](https://www.iguazio.com/docs/tutorials/latest-release/getting-started/fundamentals/#data-paths-presto) in the platform documentation.\n", + "> In the following example, the path is set by using the `sql_table_path` variable that was defined in the [\"kv\" backend initialization](#frames-kv-init) step.\n", + "> Unless you changed the code, this variable translates to `v3io.users.\"/examples/bank\"`; for example, `v3io.users.\"iguazio/examples/bank\"` for user \"iguazio\"." ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 5, "metadata": {}, "outputs": [ { @@ -295,68 +384,83 @@ " no\n", " secondary\n", " 0\n", - " yes\n", + " no\n", " unknown\n", - " 249\n", + " 219\n", " married\n", " no\n", - " 19317\n", - " aug\n", - " cellular\n", - " 1\n", - " yes\n", + " 26452\n", + " jul\n", + " telephone\n", + " 2\n", + " no\n", " retired\n", - " 4\n", - " 68\n", + " 15\n", + " 75\n", " -1\n", " \n", " \n", " no\n", " secondary\n", " 0\n", - " no\n", + " yes\n", " unknown\n", - " 219\n", + " 249\n", " married\n", " no\n", - " 26452\n", - " jul\n", - " telephone\n", - " 2\n", - " no\n", + " 19317\n", + " aug\n", + " cellular\n", + " 1\n", + " yes\n", " retired\n", - " 15\n", - " 75\n", + " 4\n", + " 68\n", " -1\n", " \n", "" ], "text/plain": [ - "[('no', 'secondary', 0, 'yes', 'unknown', 249, 'married', 'no', 19317, 'aug', 'cellular', 1, 'yes', 'retired', 4, 68, -1),\n", - " ('no', 'secondary', 0, 'no', 'unknown', 219, 'married', 'no', 26452, 'jul', 'telephone', 2, 'no', 'retired', 15, 75, -1)]" + "[('no', 'secondary', 0, 'no', 'unknown', 219, 'married', 'no', 26452, 'jul', 'telephone', 2, 'no', 'retired', 15, 75, -1),\n", + " ('no', 'secondary', 0, 'yes', 'unknown', 249, 'married', 'no', 19317, 'aug', 'cellular', 1, 'yes', 'retired', 4, 68, -1)]" ] }, - "execution_count": 4, + "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "table_path = os.path.join('v3io.users.\"'+os.getenv('V3IO_USERNAME')+'/examples/bank\"')\n", - "%sql select * from $table_path where balance > 10000" + "%sql select * from $sql_table_path where balance > 10000" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Read the data through frames API\n", - "the frames API returns a dataframe or a dataframe iterator (a stream)
" + "\n", + "### Read from the Table Using the Frames API\n", + "\n", + "Use the `read` method of the Frames client with the `\"kv\"` backend to read data from your NoSQL table.
\n", + "The `read` method can return a DataFrame or a DataFrame iterator (a stream), as demonstrated in the following examples.\n", + "\n", + "- [Read Using a Single DataFrame](#frames-kv-read-frames-api-single-df)\n", + "- [Read Using a DataFrame Iterator (Streaming)](#frames-kv-read-frames-api-df-iterator)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "#### Read Using a Single DataFrame\n", + "\n", + "The following example uses a single command to read data from the NoSQL table into a DataFrame." ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -380,26 +484,26 @@ " \n", " \n", " \n", - " housing\n", - " contact\n", - " education\n", - " loan\n", + " age\n", + " balance\n", " campaign\n", - " pdays\n", - " poutcome\n", + " contact\n", + " day\n", " default\n", - " balance\n", " duration\n", - " previous\n", + " education\n", + " housing\n", " job\n", + " loan\n", " marital\n", " month\n", - " day\n", - " age\n", + " pdays\n", + " poutcome\n", + " previous\n", " y\n", " \n", " \n", - " __name\n", + " index\n", " \n", " \n", " \n", @@ -422,22 +526,22 @@ " \n", " \n", " 75\n", - " no\n", - " telephone\n", - " secondary\n", - " no\n", + " 75.0\n", + " 26452.0\n", " 2.0\n", - " -1.0\n", - " unknown\n", + " telephone\n", + " 15.0\n", " no\n", - " 26452.0\n", " 219.0\n", - " 0.0\n", + " secondary\n", + " no\n", " retired\n", + " no\n", " married\n", " jul\n", - " 15.0\n", - " 75.0\n", + " -1.0\n", + " unknown\n", + " 0.0\n", " no\n", " \n", " \n", @@ -445,22 +549,22 @@ "" ], "text/plain": [ - " housing contact education loan campaign pdays poutcome default \\\n", - "__name \n", - "75 no telephone secondary no 2.0 -1.0 unknown no \n", + " age balance campaign contact day default duration education \\\n", + "index \n", + "75 75.0 26452.0 2.0 telephone 15.0 no 219.0 secondary \n", "\n", - " balance duration previous job marital month day age y \n", - "__name \n", - "75 26452.0 219.0 0.0 retired married jul 15.0 75.0 no " + " housing job loan marital month pdays poutcome previous y \n", + "index \n", + "75 no retired no married jul -1.0 unknown 0.0 no " ] }, - "execution_count": 5, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "df = client.read(backend='kv', table=tablename, filter=\"balance>20000\")\n", + "df = client.read(backend=\"kv\", table=table, filter=\"balance > 20000\")\n", "df.head(8)" ] }, @@ -468,36 +572,36 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Read the data as a stream iterator\n", - "to use iterator and allow cuncurent data movement and processing add `iterator=True`, you will need to iterate over the returned value or use `concat`\n", - "iterators work with all backends (not just stream), they allow streaming when placed as an input to write functions which support iterators as input" + "\n", + "#### Read Using a DataFrame Iterator (Streaming)\n", + "\n", + "The following example uses a DataFrame iterator to stream data from the NoSQL table into multiple DataFrames and allow concurrent data movement and processing.
\n", + "The example sets the `iterator` parameter to `True` to receive a DataFrame iterator (instead of the default single DataFrame), and then iterates the DataFrames in the returned iterator; you can also use `concat` instead of iterating the DataFrames.\n", + "\n", + "> **Note:** Iterators work with all Frames backends and can be used as input to write functions that support this, such as the `write` method of the Frames client." ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - " balance campaign marital default loan contact y age \\\n", - "__name \n", - "75 26452.0 2.0 married no no telephone no 75.0 \n", + " age balance campaign contact day default duration education \\\n", + "index \n", + "75 75.0 26452.0 2.0 telephone 15.0 no 219.0 secondary \n", "\n", - " duration previous day housing pdays education job poutcome \\\n", - "__name \n", - "75 219.0 0.0 15.0 no -1.0 secondary retired unknown \n", - "\n", - " month \n", - "__name \n", - "75 jul \n" + " housing job loan marital month pdays poutcome previous y \n", + "index \n", + "75 no retired no married jul -1.0 unknown 0.0 no \n" ] } ], "source": [ - "dfs = client.read(backend='kv', table=tablename, filter=\"balance>20000\", iterator=True)\n", + "dfs = client.read(backend=\"kv\", table=table, filter=\"balance > 20000\", iterator=True)\n", "for df in dfs:\n", " print(df.head())" ] @@ -506,108 +610,139 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Batch updates with expression\n", - "in many cases we want to update specific column values or update a column using an expression (e.g. counter = counter + x)
\n", - "when using the key/value backend it can run an expression against each of the rows (specified in the index), and use the dataframe columns as parameters
\n", - "columns are specified using `{}`, e.g. specifing `expression=\"packets=packets+{pkt};bytes=bytes+{bytes};last_update={mytime}\"` will add the data in `pkt` and `bytes` column from the input dataframe to the `packets` and `bytes` columns in the row and set the `last_update` field to `mytime`. the rows are selected based on the input dataframe index" + "\n", + "### Write Data Using an Update Expression\n", + "\n", + "In many cases, it's useful to update specific attributes (columns) by using an update expression (for example, `counter = counter + 1`).\n", + "The `write` method and the `update` command of the `execute` method of the Frames client support an optional `expression` parameter for the `\"kv\"` backend, which can be set to a [platform update expression](https://www.iguazio.com/docs/reference/latest-release/expressions/update-expression/).\n", + "The difference is that `write` applies the expression to all the DataFrame items (rows) while `update` applies the expression only to a single item, as explained in the following examples.\n", + "\n", + "In Frames update expressions, attributes (columns) in the written DataFrame are embedded within curly braces (`{ATTRIBUTE}`); attributes in the target table are specified simply by their names (`ATTRIBUTE`), as with all platform expressions.\n", + "For example, `expression=\"packets=packets+{pkt}; bytes=bytes+{bytes}; last_update={mytime}\"` updates the values of the `packets` and `bytes` attributes in the table item by adding to their current values the values of the `pkt` and `bytes` DataFrame columns, and sets the value of the `last_update` attribute in the table item to the value of the `mytime` DataFrame column (creating the attribute if it doesn't already exist in the table item).\n", + "\n", + "> **Note:**\n", + "> - When setting the expression parameter, Frames doesn't update the table schema (unlike in standard writes).\n", + "> - Both the `write` method and the `update` command of the `execute` method also support an optional `condition` parameter for the `\"kv\"` backend.
\n", + "> This parameter can be set to a [platform condition expression](https://www.iguazio.com/docs/reference/latest-release/expressions/condition-expression/) to perform a conditional update — i.e., only update or create new items if specific conditions are met.\n", + "> Note that when the condition expression references a non-existing attribute, the condition evaluates to `false`.\n", + "\n", + "- [Use the Write Method to Perform a Batch Update](#frames-kv-write-expression-batch-update)\n", + "- [Use the Update Method's Execute Command to Update a Single Item](#frames-kv-write-expression-single-item-update-w-execute-update-cmd)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "#### Use the Write Method to Perform a Batch Update\n", + "\n", + "The `write` method applies the update expression of the `expression` parameter to all items in the DataFrame (\"batch\" update); i.e., all table items (rows) whose primary-key attribute (index-column) values match those of the DataFrame items are updated, and items that don't exist in the table are created." ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# example: creating a new column which reflect the delta between the old `balance` column and the one provided in df (should result in 0 since df didnt change)\n", - "out = client.write('kv', tablename, df, expression='balance_delta=balance-{balance}')" + "# Add a new \"balance_delta\" attribute (column) to all table items (rows) and set its value to the difference (delta) between the\n", + "# current value of the \"balance\" attribute in the table and the value provided for this attribute in the DataFrame.\n", + "# Because the value of \"balance\" in the DataFrame wasn't modified since it was written to the table, the attribute value that is written to table (for all items) should be 0.\n", + "out = client.write(\"kv\", table, df, expression=\"balance_delta = balance - {balance}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Making a single row update using execute command\n", - "The use of `condition` is optional and allow to implement safe/conditional transactions " + "\n", + "#### Use the Update Method's Execute Command to Update a Single Item\n", + "\n", + "The `update` command of the `execute` method updates or creates a single item whose primary-key attribute (index-column) value is specified in the command's `key` parameter, as demonstrated in the following example.\n", + "The example also uses the optional `condition` parameter to perform the update only if the specified condition is met." ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - "
" - ], - "text/plain": [ - "Empty DataFrame\n", - "Columns: []\n", - "Index: []" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ - "client.execute('kv',tablename,'update', args={'key':'44', 'expression': 'age=44', 'condition':'balance>0'})" + "# Conditionally update the table item whose primary-key attribute (index-column) value is 44 (`key`) and\n", + "# set its \"age\" attribute to 44, provided the value of the item's \"balance\" attribute is greater than 0.\n", + "client.execute(\"kv\", table, \"update\", args={\"key\": \"44\", \"expression\": \"age=44\", \"condition\": \"balance > 0\"})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Delete the table\n", - "note: in kv (NoSQL) tabels there is no need to create a table before using it" + "\n", + "### Delete the NoSQL Table\n", + "\n", + "Use the `delete` method of the Frames client with the `\"kv\"` backend to delete the NoSQL table that was used in the previous steps." ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ - "client.delete('kv',table=tablename)" + "# Delete the `table` NoSQL table\n", + "client.delete(\"kv\", table)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "\n", - "## Working with time-series data" + "\n", + "## Working with Time-Series Databases (\"tsdb\" Backend)\n", + "\n", + "This section demonstrates how to use the `\"tsdb\"` Frames backend to create a time-series database (TSDB) table in the platform, ingest data into the table, and read from the table (i.e., submit TSDB queries).\n", + "\n", + "- [Initialization](#frames-tsdb-init)\n", + "- [Create a TSDB Table](#frames-tsdb-create)\n", + "- [Write to the TSDB Table](#frames-tsdb-write)\n", + "- [Read from the TSDB Table](#frames-tsdb-read)\n", + "- [Delete the TSDB Table](#frames-tsdb-delete)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Note that the tsdb table example will be created under the root of the \"users\" container" + "\n", + "### Initialization\n", + "\n", + "Start out by defining a TSDB table-path variable that will be used in the tutorial's code examples.
\n", + "The table path (`tsdb_table`) is relative to the configured parent data container; see [Create a TSDB Table](#frames-tsdb-create)." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "# Relative path to the TSDB table within the parent platform data container\n", + "tsdb_table = os.path.join(os.getenv(\"V3IO_USERNAME\") + \"/examples/tsdb_tab\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "### Create a TSDB Table\n", + "\n", + "Use the `create` method of the Frames client with the `\"tsdb\"` backend to create a new TSDB table.
\n", + "The mandatory `table` parameter specifies the relative table path within the data container that was configured for the Frames client (see the [main initialization](#frames-init) step).\n", + "In the following example, the relative table path is set by using the `tsdb_table` variable that was defined in the [\"tsdb\" backend initialization](#frames-tsdb-init) step.
\n", + "You can optionally use the `attrs` parameter to provide additional arguments.\n", + "For example, you can set the `rate` argument to the TSDB’s metric-samples ingestion rate (`\"[0-9]+/[smh]\"`; for example, `1/s`); the rate should be calculated according to the slowest expected ingestion rate." ] }, { @@ -616,8 +751,33 @@ "metadata": {}, "outputs": [], "source": [ - "# create a time series table, rate specifies the typical ingestion rate (e.g. one sample per minute)\n", - "client.create(backend='tsdb', table='tsdb_tab',attrs={'rate':'1/m'})" + "# Create a new TSDB table; ingestion rate = one sample per minute (\"1/m\")\n", + "client.create(backend=\"tsdb\", table=tsdb_table, attrs={\"rate\": \"1/m\"})" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "### Write to the TSDB Table\n", + "\n", + "Use the `write` method of the Frames client with the `\"tsdb\"` backend to ingest data from a pandas DataFrame into your TSDB table.
\n", + "The primary-key attribute of platform TSDB tables (i.e., the DataFrame index column) must hold the sample time of the data (displayed as `time` in read outputs).
\n", + "In addition, TSDB table items (rows) can optionally have sub-index columns (attributes) that are called labels.\n", + "You can add labels to TSDB table items in one of two ways; you can also combine these methods:\n", + "\n", + "- Use the `labels` dictionary parameter of the `write` method to add labels to all the written metric-sample table items (DataFrame rows) — `{\"