Skip to content

Commit

Permalink
Merge branch 'master' into spark_47353_3_clean
Browse files Browse the repository at this point in the history
  • Loading branch information
GideonPotok authored May 28, 2024
2 parents f054589 + 2493900 commit 5d171d6
Show file tree
Hide file tree
Showing 159 changed files with 5,044 additions and 1,395 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_python_connect.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
cd python
python packaging/connect/setup.py sdist
cd dist
pip install pyspark-connect-*.tar.gz
pip install pyspark*connect-*.tar.gz
pip install 'six==1.16.0' 'pandas<=2.2.2' scipy 'plotly>=4.8' 'mlflow>=2.8.1' coverage matplotlib openpyxl 'memory-profiler>=0.61.0' 'scikit-learn>=1.3.2' torch torchvision torcheval deepspeed unittest-xml-reporting
- name: Run tests
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testChunkedStream() throws Exception {

// Validate we read data correctly
assertEquals(bodyResult.readableBytes(), chunkSize);
assert(bodyResult.readableBytes() < (randomData.length - readIndex));
assertTrue(bodyResult.readableBytes() < (randomData.length - readIndex));
while (bodyResult.readableBytes() > 0) {
assertEquals(bodyResult.readByte(), randomData[readIndex++]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ private void transferAllOutstanding() {
if (numRetries > 0) {
logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length),
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries));
} else {
logger.error("Exception while beginning {} of {} outstanding blocks", e,
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length));
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, blockIdsToTransfer.length));
}
if (shouldRetry(e) && initiateRetry(e)) {
// successfully initiated a retry
Expand Down Expand Up @@ -219,7 +219,7 @@ synchronized boolean initiateRetry(Throwable e) {
MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()),
MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount),
MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries),
MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()),
MDC.of(LogKeys.NUM_BLOCKS$.MODULE$, outstandingBlocksIds.size()),
MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void testRetryOnSaslTimeout() throws IOException, InterruptedException {
verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
verify(listener).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == 0);
assertEquals(0, _retryingBlockTransferor.getRetryCount());
}

@Test
Expand All @@ -310,7 +310,7 @@ public void testRepeatedSaslRetryFailures() throws IOException, InterruptedExcep
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslTimeoutException);
verify(listener, times(3)).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
assertEquals(MAX_RETRIES, _retryingBlockTransferor.getRetryCount());
}

@Test
Expand Down Expand Up @@ -339,7 +339,7 @@ public void testBlockTransferFailureAfterSasl() throws IOException, InterruptedE
// This should be equal to 1 because after the SASL exception is retried,
// retryCount should be set back to 0. Then after that b1 encounters an
// exception that is retried.
assert(_retryingBlockTransferor.getRetryCount() == 1);
assertEquals(1, _retryingBlockTransferor.getRetryCount());
}

@Test
Expand Down Expand Up @@ -368,7 +368,7 @@ public void testIOExceptionFailsConnectionEvenWithSaslException()
verify(listener, timeout(5000)).onBlockTransferFailure("b0", saslExceptionFinal);
verify(listener, atLeastOnce()).getTransferType();
verifyNoMoreInteractions(listener);
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
assertEquals(MAX_RETRIES, _retryingBlockTransferor.getRetryCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,155 @@
* Utility class for collation-aware UTF8String operations.
*/
public class CollationAwareUTF8String {

/**
* The constant value to indicate that the match is not found when searching for a pattern
* string in a target string.
*/
private static final int MATCH_NOT_FOUND = -1;

/**
* Returns whether the target string starts with the specified prefix, starting from the
* specified position (0-based index referring to character position in UTF8String), with respect
* to the UTF8_BINARY_LCASE collation. The method assumes that the prefix is already lowercased
* prior to method call to avoid the overhead of calling .toLowerCase() multiple times on the
* same prefix string.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param startPos the start position for searching (in the target string)
* @return whether the target string starts with the specified prefix in UTF8_BINARY_LCASE
*/
public static boolean lowercaseMatchFrom(
final UTF8String target,
final UTF8String lowercasePattern,
int startPos) {
return lowercaseMatchLengthFrom(target, lowercasePattern, startPos) != MATCH_NOT_FOUND;
}

/**
* Returns the length of the substring of the target string that starts with the specified
* prefix, starting from the specified position (0-based index referring to character position
* in UTF8String), with respect to the UTF8_BINARY_LCASE collation. The method assumes that the
* prefix is already lowercased. The method only considers the part of target string that
* starts from the specified (inclusive) position (that is, the method does not look at UTF8
* characters of the target string at or after position `endPos`). If the prefix is not found,
* MATCH_NOT_FOUND is returned.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param startPos the start position for searching (in the target string)
* @return length of the target substring that starts with the specified prefix in lowercase
*/
private static int lowercaseMatchLengthFrom(
final UTF8String target,
final UTF8String lowercasePattern,
int startPos) {
assert startPos >= 0;
for (int len = 0; len <= target.numChars() - startPos; ++len) {
if (target.substring(startPos, startPos + len).toLowerCase().equals(lowercasePattern)) {
return len;
}
}
return MATCH_NOT_FOUND;
}

/**
* Returns the position of the first occurrence of the pattern string in the target string,
* starting from the specified position (0-based index referring to character position in
* UTF8String), with respect to the UTF8_BINARY_LCASE collation. The method assumes that the
* pattern string is already lowercased prior to call. If the pattern is not found,
* MATCH_NOT_FOUND is returned.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param startPos the start position for searching (in the target string)
* @return the position of the first occurrence of pattern in target
*/
private static int lowercaseFind(
final UTF8String target,
final UTF8String lowercasePattern,
int startPos) {
assert startPos >= 0;
for (int i = startPos; i <= target.numChars(); ++i) {
if (lowercaseMatchFrom(target, lowercasePattern, i)) {
return i;
}
}
return MATCH_NOT_FOUND;
}

/**
* Returns whether the target string ends with the specified suffix, ending at the specified
* position (0-based index referring to character position in UTF8String), with respect to the
* UTF8_BINARY_LCASE collation. The method assumes that the suffix is already lowercased prior
* to method call to avoid the overhead of calling .toLowerCase() multiple times on the same
* suffix string.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param endPos the end position for searching (in the target string)
* @return whether the target string ends with the specified suffix in lowercase
*/
public static boolean lowercaseMatchUntil(
final UTF8String target,
final UTF8String lowercasePattern,
int endPos) {
return lowercaseMatchLengthUntil(target, lowercasePattern, endPos) != MATCH_NOT_FOUND;
}

/**
* Returns the length of the substring of the target string that ends with the specified
* suffix, ending at the specified position (0-based index referring to character position in
* UTF8String), with respect to the UTF8_BINARY_LCASE collation. The method assumes that the
* suffix is already lowercased. The method only considers the part of target string that ends
* at the specified (non-inclusive) position (that is, the method does not look at UTF8
* characters of the target string at or after position `endPos`). If the suffix is not found,
* MATCH_NOT_FOUND is returned.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param endPos the end position for searching (in the target string)
* @return length of the target substring that ends with the specified suffix in lowercase
*/
private static int lowercaseMatchLengthUntil(
final UTF8String target,
final UTF8String lowercasePattern,
int endPos) {
assert endPos <= target.numChars();
for (int len = 0; len <= endPos; ++len) {
if (target.substring(endPos - len, endPos).toLowerCase().equals(lowercasePattern)) {
return len;
}
}
return MATCH_NOT_FOUND;
}

/**
* Returns the position of the last occurrence of the pattern string in the target string,
* ending at the specified position (0-based index referring to character position in
* UTF8String), with respect to the UTF8_BINARY_LCASE collation. The method assumes that the
* pattern string is already lowercased prior to call. If the pattern is not found,
* MATCH_NOT_FOUND is returned.
*
* @param target the string to be searched in
* @param lowercasePattern the string to be searched for
* @param endPos the end position for searching (in the target string)
* @return the position of the last occurrence of pattern in target
*/
private static int lowercaseRFind(
final UTF8String target,
final UTF8String lowercasePattern,
int endPos) {
assert endPos <= target.numChars();
for (int i = endPos; i >= 0; --i) {
if (lowercaseMatchUntil(target, lowercasePattern, i)) {
return i;
}
}
return MATCH_NOT_FOUND;
}

public static UTF8String replace(final UTF8String src, final UTF8String search,
final UTF8String replace, final int collationId) {
// This collation aware implementation is based on existing implementation on UTF8String
Expand Down Expand Up @@ -183,6 +332,23 @@ public static int findInSet(final UTF8String match, final UTF8String set, int co
return 0;
}

/**
* Returns the position of the first occurrence of the pattern string in the target string,
* starting from the specified position (0-based index referring to character position in
* UTF8String), with respect to the UTF8_BINARY_LCASE collation. If the pattern is not found,
* MATCH_NOT_FOUND is returned.
*
* @param target the string to be searched in
* @param pattern the string to be searched for
* @param start the start position for searching (in the target string)
* @return the position of the first occurrence of pattern in target
*/
public static int lowercaseIndexOf(final UTF8String target, final UTF8String pattern,
final int start) {
if (pattern.numChars() == 0) return 0;
return lowercaseFind(target, pattern.toLowerCase(), start);
}

public static int indexOf(final UTF8String target, final UTF8String pattern,
final int start, final int collationId) {
if (pattern.numBytes() == 0) {
Expand Down Expand Up @@ -467,4 +633,7 @@ public static UTF8String lowercaseTrimRight(
}
return srcString.copyUTF8String(0, trimByteIdx);
}

// TODO: Add more collation-aware UTF8String operations here.

}
Loading

0 comments on commit 5d171d6

Please sign in to comment.