Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKLOG-35153] Orc/Parquet support for Azure Datalake Gen1 VFS plugin #1207

Open
wants to merge 1 commit into
base: 9.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions shims/apache/driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>hadoop-azure</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure-datalake</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -253,6 +258,7 @@
*:com.fasterxml.jackson.dataformat,*:hadoop-mapreduce-client-core,*:hadoop-common,*:orc-core,*:avro,
*:hadoop-aws,
*:hadoop-azure,
*:hadoop-azure-datalake,
com.amazonaws:*,
*:parquet-hadoop-bundle,*:hadoop-hdfs,
*:avro-mapred,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.pentaho.hadoop.shim.api.format.org.pentaho.hadoop.shim.pvfs.api.PvfsHadoopBridgeFileSystemExtension;

import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen2Conf;
import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen1Conf;
import org.pentaho.hadoop.shim.pvfs.conf.GcsConf;
import org.pentaho.hadoop.shim.pvfs.conf.HCPConf;
import org.pentaho.hadoop.shim.pvfs.conf.PvfsConf;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class PvfsHadoopBridge extends FileSystem implements PvfsHadoopBridgeFile

@SuppressWarnings( "unused" )
public PvfsHadoopBridge() {
confFactories = Arrays.asList( S3Conf::new, HCPConf::new, SnwConf::new, GcsConf::new, ADLSGen2Conf::new );
confFactories = Arrays.asList( S3Conf::new, HCPConf::new, SnwConf::new, GcsConf::new, ADLSGen2Conf::new, ADLSGen1Conf::new );
connMgr = ConnectionManager.getInstance();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*******************************************************************************
*
* Pentaho Big Data
*
* Copyright (C) 2021 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.hadoop.shim.pvfs.conf;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.AdlConfKeys;
import org.apache.hadoop.fs.adl.AdlFileSystem;
import org.pentaho.di.connections.ConnectionDetails;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.hadoop.fs.Path.SEPARATOR;
import static org.pentaho.hadoop.shim.pvfs.PvfsHadoopBridge.getConnectionName;

public class ADLSGen1Conf extends PvfsConf {

private static final String AZURE_AUTH_TYPE = "fs.adl.oauth2.access.token.provider.type";

private final String accountFQDN;
private final String scheme;
private String clientId;
private String clientSecret;
private String authTokenEndpoint;

public ADLSGen1Conf(ConnectionDetails details ) {
super( details );
try ( AdlFileSystem adlFileSystem = new AdlFileSystem() ) {
scheme = adlFileSystem.getScheme();
accountFQDN = details.getProperties().get( "accountFQDN" );
if ( isServiceToServiceAuthentication( details.getProperties().get( "clientId" ),
details.getProperties().get( "clientSecret" ), details.getProperties().get( "authTokenEndpoint" ) ) ) {
clientId = details.getProperties().get( "clientId" );
clientSecret = details.getProperties().get( "clientSecret" );
authTokenEndpoint = details.getProperties().get( "authTokenEndpoint" );
}
} catch ( IOException e ) {
throw new IllegalStateException( e );
}
}

@Override
public boolean supportsConnection() {
return scheme.equalsIgnoreCase( details.getType() );
}

@Override
public Path mapPath( Path pvfsPath ) {
validatePath( pvfsPath );
String[] splitPath = pvfsPath.toUri().getPath().split( "/" );

Preconditions.checkArgument( splitPath.length > 0 );
String bucket = accountFQDN;
String path = SEPARATOR + Arrays.stream( splitPath ).skip( 1 ).collect( Collectors.joining( SEPARATOR ) );
try {
return new Path( new URI( scheme, bucket, path, null ) );
} catch ( URISyntaxException e ) {
throw new IllegalStateException( e );
}
}

@Override
public Path mapPath( Path pvfsPath, Path realFsPath ) {
URI uri = realFsPath.toUri();
String userInfo = uri.getUserInfo();
if ( uri.getUserInfo() == null ) {
userInfo = "";
}
return new Path( pvfsPath.toUri().getScheme(),
getConnectionName( pvfsPath ), "/" + userInfo + uri.getPath() );
}

@Override
public Configuration conf( Path pvfsPath ) {
Configuration config = new Configuration();
/**
* Azure Connector configurations can be found here :
* https://hadoop.apache.org/docs/r2.8.0/hadoop-azure-datalake/index.html
*/
config.set( "fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem" );
config.set( "fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl" );
if ( !isNullOrEmpty( clientId ) && !isNullOrEmpty( clientSecret ) && !isNullOrEmpty( authTokenEndpoint ) ) {
config.set( AZURE_AUTH_TYPE , AdlConfKeys.TOKEN_PROVIDER_TYPE_CLIENT_CRED );
config.set( "fs.adl.oauth2.refresh.url", authTokenEndpoint);
config.set( "fs.adl.oauth2.client.id", clientId );
config.set( "fs.adl.oauth2.credential", clientSecret );
}
return config;
}

private boolean isServiceToServiceAuthentication( String clientId, String clientSecret, String authTokenEndpoint ) {
return !isNullOrEmpty( clientId ) && !isNullOrEmpty( clientSecret ) && !isNullOrEmpty( authTokenEndpoint );
}

@Override
public boolean equals( Object o ) {
if ( this == o ) {
return true;
}
if ( o == null || getClass() != o.getClass() ) {
return false;
}
if ( !super.equals( o ) ) {
return false;
}
ADLSGen1Conf adlsConf = ( ADLSGen1Conf ) o;
return Objects.equals( accountFQDN, adlsConf.accountFQDN );
}

@Override
public int hashCode() {
return Objects.hash( super.hashCode(), accountFQDN );
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.pentaho.hadoop.shim.pvfs.PvfsHadoopBridge
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem
org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem
org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem
org.apache.hadoop.fs.adl.AdlFileSystem
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*******************************************************************************
*
* Pentaho Big Data
*
* Copyright (C) 2021 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package org.pentaho.hadoop.shim.pvfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.connections.ConnectionDetails;
import org.pentaho.hadoop.shim.pvfs.conf.ADLSGen1Conf;
import org.pentaho.hadoop.shim.pvfs.conf.S3Conf;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
import static org.mockito.Mockito.when;

@RunWith( MockitoJUnitRunner.class )
public class ADLSGen1ConfTest {

private ADLSGen1Conf adlsGen1Conf;
private ADLSGen1Conf badADLSGen1Conf;
private S3Conf s3Conf;
private Path path;
@Mock
private ConnectionDetails adlsGen1Conn;
@Mock private ConnectionDetails otherADLS1Conn;
@Mock private ConnectionDetails hcpConn;
@Mock private ConnectionDetails s3Conn;
private Map<String, String> adlsGen1Props = new HashMap<>();
private Map<String, String> s3Props = new HashMap<>();

@Before
public void before() {
adlsGen1Props.put( "accountFQDN", "mockAccountName.azuredatalakestore.net" );
adlsGen1Props.put( "clientId", "mOckSharedKey==" );
adlsGen1Props.put( "clientSecret", "mockAccountName" );
adlsGen1Props.put( "authTokenEndpoint", "https://login.microsoftonline.com/123/oauth2/token" );
when( adlsGen1Conn.getProperties() ).thenReturn( adlsGen1Props );
when( adlsGen1Conn.getType() ).thenReturn( "adl" );
adlsGen1Conf = new ADLSGen1Conf( adlsGen1Conn );


path = new Path( "pvfs://gen1Conn/mockContainer/mockFile.txt" );

s3Props.put( "accessKey", "ACCESSKEY" );
s3Props.put( "secretKey", "SECRETKEY" );
when( s3Conn.getProperties() ).thenReturn( s3Props );
when( s3Conn.getType() ).thenReturn( "s3" );
s3Conf = new S3Conf( s3Conn );

when( hcpConn.getType() ).thenReturn( "hcp" );
badADLSGen1Conf = new ADLSGen1Conf( hcpConn );
}

@Test
public void testSupportedSchemes() {
assertTrue( adlsGen1Conf.supportsConnection() );
assertFalse( badADLSGen1Conf.supportsConnection() );
}

@Test public void mapPath() {
Path result = adlsGen1Conf.mapPath( path );
assertEquals( result.toString(), "adl://mockAccountName.azuredatalakestore.net/mockContainer/mockFile.txt" );
}

@Test public void mapPathWithSpaces() {
Path pathWithSpaces = new Path( "pvfs://gen1 Conn/mockContainer/mockFile.txt" );
Path result = adlsGen1Conf.mapPath( pathWithSpaces );
assertEquals( result.toString(), "adl://mockAccountName.azuredatalakestore.net/mockContainer/mockFile.txt" );
}

@Test public void testConf() {
Configuration conf = adlsGen1Conf.conf( path );
assertEquals( conf.get( "fs.adl.impl" ), "org.apache.hadoop.fs.adl.AdlFileSystem" );
assertEquals( conf.get( "fs.AbstractFileSystem.adl.impl" ), "org.apache.hadoop.fs.adl.Adl" );
}

@Test public void testEquals() {
assertNotEquals( null, adlsGen1Conf );
assertEquals( adlsGen1Conf, adlsGen1Conf );
assertNotEquals( adlsGen1Conf, s3Conf );
when( otherADLS1Conn.getProperties() ).thenReturn( new HashMap<>( adlsGen1Props ) );
when( otherADLS1Conn.getType() ).thenReturn( "adl" );

ADLSGen1Conf otherGen1Conf = new ADLSGen1Conf( otherADLS1Conn );

assertEquals( otherGen1Conf, adlsGen1Conf );
// change auth credentials path
otherADLS1Conn.getProperties().put( "sharedKey", "othermOckSharedKey==" );
assertNotEquals( otherGen1Conf, adlsGen1Conf );

assertNotEquals( adlsGen1Conf.hashCode(), s3Conf.hashCode() );
assertNotEquals( adlsGen1Conf.hashCode(), otherGen1Conf.hashCode() );
}
}