diff --git a/CHANGELOG.md b/CHANGELOG.md index 2216f3980..b01f534d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ #Changelog +## 7.0.10 (2016-06-27) +### 修正 +* 断点上传,skip 已上传部分 + ## 7.0.9 (2016-04-22) ### 增加 * 强制copy或者move diff --git a/gradle.properties b/gradle.properties index 11da4a462..dd7132d03 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,5 +16,5 @@ POM_DEVELOPER_ID=qiniu POM_DEVELOPER_NAME=Qiniu POM_ARTIFACT_ID=qiniu-java-sdk -POM_NAME=java-sdk +POM_NAME=qiniu-java-sdk POM_PACKAGING=jar \ No newline at end of file diff --git a/src/main/java/com/qiniu/common/Config.java b/src/main/java/com/qiniu/common/Config.java index 01894747b..c62a23383 100644 --- a/src/main/java/com/qiniu/common/Config.java +++ b/src/main/java/com/qiniu/common/Config.java @@ -6,7 +6,7 @@ public final class Config { - public static final String VERSION = "7.0.9"; + public static final String VERSION = "7.0.10"; /** * 断点上传时的分块大小(默认的分块大小, 不允许改变) */ diff --git a/src/main/java/com/qiniu/storage/ResumeUploader.java b/src/main/java/com/qiniu/storage/ResumeUploader.java index 5c0d8a68e..956292cd1 100644 --- a/src/main/java/com/qiniu/storage/ResumeUploader.java +++ b/src/main/java/com/qiniu/storage/ResumeUploader.java @@ -74,6 +74,12 @@ public Response upload() throws QiniuException { } boolean retry = false; int contextIndex = blockIdx(uploaded); + try { + file.skip(uploaded); + } catch (IOException e) { + close(); + throw new QiniuException(e); + } while (uploaded < size) { int blockSize = nextBlockSize(uploaded); try { diff --git a/src/test/java/com/qiniu/TempFile.java b/src/test/java/com/qiniu/TempFile.java index 648987838..aabcbb454 100644 --- a/src/test/java/com/qiniu/TempFile.java +++ b/src/test/java/com/qiniu/TempFile.java @@ -3,6 +3,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Random; /** * Created by bailong on 14/10/11. @@ -11,6 +12,8 @@ public final class TempFile { private TempFile() { } + static final Random r = new Random(); + public static void remove(File f) { f.delete(); } @@ -22,7 +25,62 @@ public static File createFile(int kiloSize) throws IOException { File f = File.createTempFile("qiniu_" + kiloSize + "k", "tmp"); f.createNewFile(); fos = new FileOutputStream(f); - byte[] b = getByte(); + byte[] b = getByte((byte) r.nextInt()); + long s = 0; + while (s < size) { + int l = (int) Math.min(b.length, size - s); + fos.write(b, 0, l); + s += l; + // 随机生成的文件的每一段(<4M)都不一样 + b = getByte((byte) r.nextInt()); + } + fos.flush(); + return f; + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private static byte[] getByte(byte b) { + int len = 498 * 4; + byte[] bs = new byte[len]; + + for (int i = 1; i < len; i++) { + bs[i] = b; + } + + bs[10] = (byte) r.nextInt(); + bs[9] = (byte) r.nextInt(); + bs[8] = (byte) r.nextInt(); + bs[7] = (byte) r.nextInt(); + bs[6] = (byte) r.nextInt(); + bs[5] = (byte) r.nextInt(); + bs[4] = (byte) r.nextInt(); + bs[3] = (byte) r.nextInt(); + bs[3] = (byte) r.nextInt(); + bs[1] = (byte) r.nextInt(); + bs[0] = (byte) r.nextInt(); + + bs[len - 2] = '\r'; + bs[len - 1] = '\n'; + return bs; + } + + + public static File createFileOld(int kiloSize) throws IOException { + FileOutputStream fos = null; + try { + long size = (long) (1024 * kiloSize); + File f = File.createTempFile("qiniu_" + kiloSize + "k", "tmp"); + f.createNewFile(); + fos = new FileOutputStream(f); + byte[] b = getByteOld(); long s = 0; while (s < size) { int l = (int) Math.min(b.length, size - s); @@ -42,7 +100,7 @@ public static File createFile(int kiloSize) throws IOException { } } - private static byte[] getByte() { + private static byte[] getByteOld() { byte[] b = new byte[1024 * 4]; b[0] = 'A'; for (int i = 1; i < 1024 * 4; i++) { diff --git a/src/test/java/com/qiniu/storage/RecordUploadTest.java b/src/test/java/com/qiniu/storage/RecordUploadTest.java index 3c18aeb7b..c8cafd9cd 100644 --- a/src/test/java/com/qiniu/storage/RecordUploadTest.java +++ b/src/test/java/com/qiniu/storage/RecordUploadTest.java @@ -2,19 +2,20 @@ import com.qiniu.TempFile; import com.qiniu.TestConfig; +import com.qiniu.common.Config; +import com.qiniu.http.Client; import com.qiniu.http.Response; import com.qiniu.storage.persistent.FileRecorder; +import com.qiniu.util.Etag; import com.qiniu.util.UrlSafeBase64; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Date; import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import static org.junit.Assert.*; @@ -22,98 +23,100 @@ * Created by Simon on 2015/3/30. */ public class RecordUploadTest { - - private boolean isDone = false; + final Random r = new Random(); private Response response = null; + final RecordKeyGenerator keyGen = new RecordKeyGenerator() { + @Override + public String gen(String key, File file) { + return key + "_._" + file.getAbsolutePath(); + } + }; + FileRecorder recorder = null; + final Client client = new Client(); - private void template(int size) throws IOException { - isDone = false; - ExecutorService threadPool = Executors.newSingleThreadExecutor(); + private void template(final int size) throws IOException { + response = null; final String expectKey = "\r\n?&r=" + size + "k"; final File f = TempFile.createFile(size); - final String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey); - final FileRecorder recorder = new FileRecorder(f.getParentFile()); - final RecordKeyGenerator keyGen = new RecordKeyGenerator() { - @Override - public String gen(String key, File file) { - return key + "_._" + file.getAbsolutePath(); - } - }; - final String recordKey = keyGen.gen(expectKey, f); - - UploadManager uploadManager = new UploadManager(recorder, keyGen); -// Response res = uploadManager.put(f, expectKey, token); - - Up up = new Up(uploadManager, f, expectKey, token); - - - // 显示断点记录文件 - Thread showRecord = new Thread() { - public void run() { - for (; ; ) { - doSleep(30); - showRecord("normal: ", recorder, recordKey); + recorder = new FileRecorder(f.getParentFile()); + try { + final String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey); + final String recordKey = keyGen.gen(expectKey, f); + + // 开始第一部分上传 + final Up up = new Up(f, expectKey, token); + new Thread() { + @Override + public void run() { + int i = r.nextInt(10000); + try { + System.out.println("UP: " + i + ", enter run"); + response = up.up(); + System.out.println("UP: " + i + ", left run"); + } catch (Exception e) { + System.out.println("UP: " + i + ", exception run"); + e.printStackTrace(); + } } - } - }; - showRecord.setDaemon(true); - showRecord.start(); - - final String p = f.getParentFile().getAbsolutePath() + "\\" + UrlSafeBase64.encodeToString(recordKey); - System.out.println(p); - System.out.println(new File(p).exists()); - - final Random r = new Random(); - - boolean shutDown = true; - - for (int i = 10; i > 0; i--) { - final int t = r.nextInt(100) + 80; - System.out.println(i + " : " + t); - final Future future = threadPool.submit(up); - - // CHECKSTYLE:OFF - // 中断线程 1 次 - if (shutDown) { - new Thread() { - public void run() { - // 百毫秒 - doSleep(t); - if (!future.isDone()) { - future.cancel(true); - } + } .start(); + + final boolean[] ch = new boolean[]{true}; + // 显示断点记录文件 + Thread showRecord = new Thread() { + public void run() { + for (; ch[0]; ) { + doSleep(1000); + showRecord("normal: " + size + " :", recorder, recordKey); + } + } + }; + showRecord.setDaemon(true); + showRecord.start(); + + if (f.length() > Config.BLOCK_SIZE) { + // 终止第一部分上传,期望其部分成功 + for (int i = 150; i > 0; --i) { + byte[] data = getRecord(recorder, recordKey); + if (data != null) { + up.close(); + doSleep(100); + break; } - }.start(); - shutDown = false; + doSleep(200); + } + up.close(); + doSleep(500); } - // CHECKSTYLE:ON - showRecord("new future: ", recorder, recordKey); - try { - Response res = future.get(); - response = res; - showRecord("done: ", recorder, recordKey); - System.out.println("break"); - break; - } catch (Exception e) { - System.out.println("Exception"); - System.out.println(Thread.currentThread().getId() + " : future.isCancelled : " + future.isCancelled()); -// e.printStackTrace(); - if (isDone) { - break; + System.out.println("response is " + response); + + // 若第一部分上传部分未全部成功,再次上传 + if (response == null) { + try { + response = new Up(f, expectKey, token).up(); + } catch (Exception e) { + e.printStackTrace(); } } - doSleep(30); - System.out.println(p); - System.out.println(new File(p).exists()); - } - TempFile.remove(f); - assertFalse(new File(p).exists()); - assertNotNull(response); - assertTrue(response.isOK()); + showRecord("done: " + size + " :", recorder, recordKey); + + ch[0] = false; + + String etag = Etag.file(f); + System.out.println("etag: " + etag); + String hash = response.jsonToMap().get("hash").toString(); + System.out.println("hash: " + hash); - showRecord("nodata: ", recorder, recordKey); + assertNotNull(response); + assertTrue(response.isOK()); + assertEquals(etag, hash); + doSleep(100); + showRecord("nodata: " + size + " :", recorder, recordKey); + assertNull(recorder.get(recordKey)); + } finally { + TempFile.remove(f); + } } private void showRecord(String pre, FileRecorder recorder, String recordKey) { @@ -129,9 +132,17 @@ private void showRecord(String pre, FileRecorder recorder, String recordKey) { } } - private void doSleep(int bm) { + private byte[] getRecord(FileRecorder recorder, String recordKey) { + byte[] data = recorder.get(recordKey); + if (data != null && data.length < 100) { + return null; + } + return data; + } + + private void doSleep(int m) { try { - Thread.sleep(100 * bm); + Thread.sleep(m); } catch (InterruptedException e) { //e.printStackTrace(); } @@ -155,6 +166,14 @@ public void test4M() throws Throwable { template(1024 * 4); } + @Test + public void test4M1K() throws Throwable { + if (TestConfig.isTravis()) { + return; + } + template(1024 * 4 + 1); + } + @Test public void test8M1k() throws Throwable { if (TestConfig.isTravis()) { @@ -171,30 +190,6 @@ public void test25M1k() throws Throwable { template(1024 * 25 + 1); } - class Up implements Callable { - private final UploadManager uploadManager; - private final File file; - private final String key; - private final String token; - - public Up(UploadManager uploadManager, File file, String key, String token) { - this.uploadManager = uploadManager; - this.file = file; - this.key = key; - this.token = token; - } - - @Override - public Response call() throws Exception { - Response res = uploadManager.put(file, key, token); - System.out.println("up: " + res); - System.out.println("up: " + res.bodyString()); - isDone = true; - response = res; - return res; - } - } - @Test public void testLastModify() throws IOException { File f = File.createTempFile("qiniutest", "b"); @@ -240,4 +235,59 @@ public void testLastModify() throws IOException { long m4 = recoderFile.lastModified(); assertTrue(m4 > m1); } + + class Up { + private final File file; + private final String key; + private final String token; + ResumeUploader uploader = null; + + public Up(File file, String key, String token) { + this.file = file; + this.key = key; + this.token = token; + } + + public void close() { + System.out.println("UP going to close"); + // 调用 uploader 私有方法 close() + try { + Method m_close = ResumeUploader.class.getDeclaredMethod("close", new Class[]{}); + m_close.setAccessible(true); + m_close.invoke(uploader, new Object[]{}); + } catch (NoSuchMethodException e) { + e.printStackTrace(); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + System.out.println("UP closed"); + } + + + public Response up() throws Exception { + int i = r.nextInt(10000); + try { + System.out.println("UP: " + i + ", enter up"); + + String recorderKey = key; + if (keyGen != null) { + recorderKey = keyGen.gen(key, file); + } + + if (recorder == null) { + recorder = new FileRecorder(file.getParentFile()); + } + uploader = new ResumeUploader(client, token, key, file, + null, Client.DefaultMime, recorder, recorderKey); + Response res = uploader.upload(); + System.out.println("UP: " + i + ", left up"); + return res; + } catch (Exception e) { + System.out.println("UP: " + i + ", exception up"); + throw e; + } + } + } } diff --git a/src/test/java/com/qiniu/util/EtagTest.java b/src/test/java/com/qiniu/util/EtagTest.java index ba00aa125..cc600493b 100644 --- a/src/test/java/com/qiniu/util/EtagTest.java +++ b/src/test/java/com/qiniu/util/EtagTest.java @@ -21,19 +21,19 @@ public void testData() { @Test public void testFile() throws IOException { - File f = TempFile.createFile(1024); + File f = TempFile.createFileOld(1024); assertEquals("Foyl8onxBLWeRLL5oItRJphv6i4b", Etag.file(f)); TempFile.remove(f); - f = TempFile.createFile(4 * 1024); + f = TempFile.createFileOld(4 * 1024); assertEquals("FicHOveBNs5Kn9d74M3b9tI4D-8r", Etag.file(f)); TempFile.remove(f); - f = TempFile.createFile(5 * 1024); + f = TempFile.createFileOld(5 * 1024); assertEquals("lg-Eb5KFCuZn-cUfj_oS2PPOU9xy", Etag.file(f)); TempFile.remove(f); - f = TempFile.createFile(8 * 1024); + f = TempFile.createFileOld(8 * 1024); assertEquals("lkSKZOMToDp-EqLDVuT1pyjQssl-", Etag.file(f)); TempFile.remove(f); - f = TempFile.createFile(9 * 1024); + f = TempFile.createFileOld(9 * 1024); assertEquals("ljgVjMtyMsOgIySv79U8Qz4TrUO4", Etag.file(f)); TempFile.remove(f); }