From 408635ea633a0e43602ed4e5b7dfad60c548230f Mon Sep 17 00:00:00 2001 From: Patitapaban Mohapatra Date: Mon, 17 May 2021 21:59:45 +0530 Subject: [PATCH] [BACKLOG-35153] Orc/Parquet support for Azure Datalake Gen1 VFS plugin --- shims/apache/driver/pom.xml | 6 + .../hadoop/shim/pvfs/PvfsHadoopBridge.java | 3 +- .../hadoop/shim/pvfs/conf/ADLSGen1Conf.java | 141 ++++++++++++++++++ .../services/org.apache.hadoop.fs.FileSystem | 3 +- .../hadoop/shim/pvfs/ADLSGen1ConfTest.java | 121 +++++++++++++++ 5 files changed, 272 insertions(+), 2 deletions(-) create mode 100644 shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/conf/ADLSGen1Conf.java create mode 100644 shims/apache/driver/src/test/java/org/pentaho/hadoop/shim/pvfs/ADLSGen1ConfTest.java diff --git a/shims/apache/driver/pom.xml b/shims/apache/driver/pom.xml index 0fcebf5c9ff..cfd3e505d69 100644 --- a/shims/apache/driver/pom.xml +++ b/shims/apache/driver/pom.xml @@ -191,6 +191,11 @@ hadoop-azure 3.3.0 + + org.apache.hadoop + hadoop-azure-datalake + 3.3.0 + com.google.guava guava @@ -253,6 +258,7 @@ *:com.fasterxml.jackson.dataformat,*:hadoop-mapreduce-client-core,*:hadoop-common,*:orc-core,*:avro, *:hadoop-aws, *:hadoop-azure, + *:hadoop-azure-datalake, com.amazonaws:*, *:parquet-hadoop-bundle,*:hadoop-hdfs, *:avro-mapred, diff --git a/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/PvfsHadoopBridge.java b/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/PvfsHadoopBridge.java index b0b141d4510..205194c2909 100644 --- a/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/PvfsHadoopBridge.java +++ b/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/PvfsHadoopBridge.java @@ -41,6 +41,7 @@ import org.pentaho.hadoop.shim.api.format.org.pentaho.hadoop.shim.pvfs.api.PvfsHadoopBridgeFileSystemExtension; import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen2Conf; +import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen1Conf; import org.pentaho.hadoop.shim.pvfs.conf.GcsConf; import org.pentaho.hadoop.shim.pvfs.conf.HCPConf; import org.pentaho.hadoop.shim.pvfs.conf.PvfsConf; @@ -75,7 +76,7 @@ public class PvfsHadoopBridge extends FileSystem implements PvfsHadoopBridgeFile @SuppressWarnings( "unused" ) public PvfsHadoopBridge() { - confFactories = Arrays.asList( S3Conf::new, HCPConf::new, SnwConf::new, GcsConf::new, ADLSGen2Conf::new ); + confFactories = Arrays.asList( S3Conf::new, HCPConf::new, SnwConf::new, GcsConf::new, ADLSGen2Conf::new, ADLSGen1Conf::new ); connMgr = ConnectionManager.getInstance(); } diff --git a/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/conf/ADLSGen1Conf.java b/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/conf/ADLSGen1Conf.java new file mode 100644 index 00000000000..772e952e2de --- /dev/null +++ b/shims/apache/driver/src/main/java/org/pentaho/hadoop/shim/pvfs/conf/ADLSGen1Conf.java @@ -0,0 +1,141 @@ +/******************************************************************************* + * + * Pentaho Big Data + * + * Copyright (C) 2021 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.hadoop.shim.pvfs.conf; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.adl.AdlConfKeys; +import org.apache.hadoop.fs.adl.AdlFileSystem; +import org.pentaho.di.connections.ConnectionDetails; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.hadoop.fs.Path.SEPARATOR; +import static org.pentaho.hadoop.shim.pvfs.PvfsHadoopBridge.getConnectionName; + +public class ADLSGen1Conf extends PvfsConf { + + private static final String AZURE_AUTH_TYPE = "fs.adl.oauth2.access.token.provider.type"; + + private final String accountFQDN; + private final String scheme; + private String clientId; + private String clientSecret; + private String authTokenEndpoint; + + public ADLSGen1Conf(ConnectionDetails details ) { + super( details ); + try ( AdlFileSystem adlFileSystem = new AdlFileSystem() ) { + scheme = adlFileSystem.getScheme(); + accountFQDN = details.getProperties().get( "accountFQDN" ); + if ( isServiceToServiceAuthentication( details.getProperties().get( "clientId" ), + details.getProperties().get( "clientSecret" ), details.getProperties().get( "authTokenEndpoint" ) ) ) { + clientId = details.getProperties().get( "clientId" ); + clientSecret = details.getProperties().get( "clientSecret" ); + authTokenEndpoint = details.getProperties().get( "authTokenEndpoint" ); + } + } catch ( IOException e ) { + throw new IllegalStateException( e ); + } + } + + @Override + public boolean supportsConnection() { + return scheme.equalsIgnoreCase( details.getType() ); + } + + @Override + public Path mapPath( Path pvfsPath ) { + validatePath( pvfsPath ); + String[] splitPath = pvfsPath.toUri().getPath().split( "/" ); + + Preconditions.checkArgument( splitPath.length > 0 ); + String bucket = accountFQDN; + String path = SEPARATOR + Arrays.stream( splitPath ).skip( 1 ).collect( Collectors.joining( SEPARATOR ) ); + try { + return new Path( new URI( scheme, bucket, path, null ) ); + } catch ( URISyntaxException e ) { + throw new IllegalStateException( e ); + } + } + + @Override + public Path mapPath( Path pvfsPath, Path realFsPath ) { + URI uri = realFsPath.toUri(); + String userInfo = uri.getUserInfo(); + if ( uri.getUserInfo() == null ) { + userInfo = ""; + } + return new Path( pvfsPath.toUri().getScheme(), + getConnectionName( pvfsPath ), "/" + userInfo + uri.getPath() ); + } + + @Override + public Configuration conf( Path pvfsPath ) { + Configuration config = new Configuration(); + /** + * Azure Connector configurations can be found here : + * https://hadoop.apache.org/docs/r2.8.0/hadoop-azure-datalake/index.html + */ + config.set( "fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem" ); + config.set( "fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl" ); + if ( !isNullOrEmpty( clientId ) && !isNullOrEmpty( clientSecret ) && !isNullOrEmpty( authTokenEndpoint ) ) { + config.set( AZURE_AUTH_TYPE , AdlConfKeys.TOKEN_PROVIDER_TYPE_CLIENT_CRED ); + config.set( "fs.adl.oauth2.refresh.url", authTokenEndpoint); + config.set( "fs.adl.oauth2.client.id", clientId ); + config.set( "fs.adl.oauth2.credential", clientSecret ); + } + return config; + } + + private boolean isServiceToServiceAuthentication( String clientId, String clientSecret, String authTokenEndpoint ) { + return !isNullOrEmpty( clientId ) && !isNullOrEmpty( clientSecret ) && !isNullOrEmpty( authTokenEndpoint ); + } + + @Override + public boolean equals( Object o ) { + if ( this == o ) { + return true; + } + if ( o == null || getClass() != o.getClass() ) { + return false; + } + if ( !super.equals( o ) ) { + return false; + } + ADLSGen1Conf adlsConf = ( ADLSGen1Conf ) o; + return Objects.equals( accountFQDN, adlsConf.accountFQDN ); + } + + @Override + public int hashCode() { + return Objects.hash( super.hashCode(), accountFQDN ); + } +} diff --git a/shims/apache/driver/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/shims/apache/driver/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem index 5d39ee5503f..7cbae1fcf76 100644 --- a/shims/apache/driver/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem +++ b/shims/apache/driver/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -1,3 +1,4 @@ org.pentaho.hadoop.shim.pvfs.PvfsHadoopBridge org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem -org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem \ No newline at end of file +org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem +org.apache.hadoop.fs.adl.AdlFileSystem \ No newline at end of file diff --git a/shims/apache/driver/src/test/java/org/pentaho/hadoop/shim/pvfs/ADLSGen1ConfTest.java b/shims/apache/driver/src/test/java/org/pentaho/hadoop/shim/pvfs/ADLSGen1ConfTest.java new file mode 100644 index 00000000000..85cbce7ddf0 --- /dev/null +++ b/shims/apache/driver/src/test/java/org/pentaho/hadoop/shim/pvfs/ADLSGen1ConfTest.java @@ -0,0 +1,121 @@ +/******************************************************************************* + * + * Pentaho Big Data + * + * Copyright (C) 2021 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ +package org.pentaho.hadoop.shim.pvfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.pentaho.di.connections.ConnectionDetails; +import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen1Conf; +import org.pentaho.hadoop.shim.pvfs.conf.S3Conf; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.*; +import static org.mockito.Mockito.when; + +@RunWith( MockitoJUnitRunner.class ) +public class ADLSGen1ConfTest { + + private ADLSGen1Conf adlsGen1Conf; + private ADLSGen1Conf badADLSGen1Conf; + private S3Conf s3Conf; + private Path path; + @Mock + private ConnectionDetails adlsGen1Conn; + @Mock private ConnectionDetails otherADLS1Conn; + @Mock private ConnectionDetails hcpConn; + @Mock private ConnectionDetails s3Conn; + private Map adlsGen1Props = new HashMap<>(); + private Map s3Props = new HashMap<>(); + + @Before + public void before() { + adlsGen1Props.put( "accountFQDN", "mockAccountName.azuredatalakestore.net" ); + adlsGen1Props.put( "clientId", "mOckSharedKey==" ); + adlsGen1Props.put( "clientSecret", "mockAccountName" ); + adlsGen1Props.put( "authTokenEndpoint", "https://login.microsoftonline.com/123/oauth2/token" ); + when( adlsGen1Conn.getProperties() ).thenReturn( adlsGen1Props ); + when( adlsGen1Conn.getType() ).thenReturn( "adl" ); + adlsGen1Conf = new ADLSGen1Conf( adlsGen1Conn ); + + + path = new Path( "pvfs://gen1Conn/mockContainer/mockFile.txt" ); + + s3Props.put( "accessKey", "ACCESSKEY" ); + s3Props.put( "secretKey", "SECRETKEY" ); + when( s3Conn.getProperties() ).thenReturn( s3Props ); + when( s3Conn.getType() ).thenReturn( "s3" ); + s3Conf = new S3Conf( s3Conn ); + + when( hcpConn.getType() ).thenReturn( "hcp" ); + badADLSGen1Conf = new ADLSGen1Conf( hcpConn ); + } + + @Test + public void testSupportedSchemes() { + assertTrue( adlsGen1Conf.supportsConnection() ); + assertFalse( badADLSGen1Conf.supportsConnection() ); + } + + @Test public void mapPath() { + Path result = adlsGen1Conf.mapPath( path ); + assertEquals( result.toString(), "adl://mockAccountName.azuredatalakestore.net/mockContainer/mockFile.txt" ); + } + + @Test public void mapPathWithSpaces() { + Path pathWithSpaces = new Path( "pvfs://gen1 Conn/mockContainer/mockFile.txt" ); + Path result = adlsGen1Conf.mapPath( pathWithSpaces ); + assertEquals( result.toString(), "adl://mockAccountName.azuredatalakestore.net/mockContainer/mockFile.txt" ); + } + + @Test public void testConf() { + Configuration conf = adlsGen1Conf.conf( path ); + assertEquals( conf.get( "fs.adl.impl" ), "org.apache.hadoop.fs.adl.AdlFileSystem" ); + assertEquals( conf.get( "fs.AbstractFileSystem.adl.impl" ), "org.apache.hadoop.fs.adl.Adl" ); + } + + @Test public void testEquals() { + assertNotEquals( null, adlsGen1Conf ); + assertEquals( adlsGen1Conf, adlsGen1Conf ); + assertNotEquals( adlsGen1Conf, s3Conf ); + when( otherADLS1Conn.getProperties() ).thenReturn( new HashMap<>( adlsGen1Props ) ); + when( otherADLS1Conn.getType() ).thenReturn( "adl" ); + + ADLSGen1Conf otherGen1Conf = new ADLSGen1Conf( otherADLS1Conn ); + + assertEquals( otherGen1Conf, adlsGen1Conf ); + // change auth credentials path + otherADLS1Conn.getProperties().put( "sharedKey", "othermOckSharedKey==" ); + assertNotEquals( otherGen1Conf, adlsGen1Conf ); + + assertNotEquals( adlsGen1Conf.hashCode(), s3Conf.hashCode() ); + assertNotEquals( adlsGen1Conf.hashCode(), otherGen1Conf.hashCode() ); + } +}