Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecordOutputStream#writeRaw emits long instead of void #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


public interface RecordOutputStream {
public void writeRaw(byte[] record) throws IOException;
public void writeRaw(byte[] record, int start, int length) throws IOException;
public void close() throws IOException;
public void flush() throws IOException;
long writeRaw(byte[] record) throws IOException;
long writeRaw(byte[] record, int start, int length) throws IOException;
void close() throws IOException;
void flush() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,32 @@

import java.io.IOException;

import static org.apache.hadoop.io.SequenceFile.Writer.file;
import static org.apache.hadoop.io.SequenceFile.Writer.compression;
import static org.apache.hadoop.io.SequenceFile.Writer.keyClass;
import static org.apache.hadoop.io.SequenceFile.Writer.valueClass;

public class SequenceFileOutputStream implements RecordOutputStream {

private SequenceFile.Writer _writer;
private BytesWritable writable = new BytesWritable();

public SequenceFileOutputStream(FileSystem fs, Path path) throws IOException {
_writer = SequenceFile.createWriter(fs, fs.getConf(), path, BytesWritable.class, NullWritable.class, CompressionType.NONE);
_writer = SequenceFile.createWriter(fs.getConf(), file(path), keyClass(BytesWritable.class), valueClass(NullWritable.class), compression(CompressionType.NONE));
}

public SequenceFileOutputStream(FileSystem fs, Path path, CompressionType type, CompressionCodec codec) throws IOException {
_writer = SequenceFile.createWriter(fs, fs.getConf(), path, BytesWritable.class, NullWritable.class, type, codec);
_writer = SequenceFile.createWriter(fs.getConf(), file(path), keyClass(BytesWritable.class), valueClass(NullWritable.class), compression(type, codec));
}

public void writeRaw(byte[] record) throws IOException {
writeRaw(record, 0, record.length);
public long writeRaw(byte[] record) throws IOException {
return writeRaw(record, 0, record.length);
}

public void writeRaw(byte[] record, int start, int length) throws IOException {
public long writeRaw(byte[] record, int start, int length) throws IOException {
writable.set(record, start, length);
_writer.append(writable, NullWritable.get());
return _writer.getLength();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ public OutputStream getWrappedStream() {
return _raw;
}

public void writeRaw(byte[] record) throws IOException {
writeRaw(record, 0, record.length);
public long writeRaw(byte[] record) throws IOException {
return writeRaw(record, 0, record.length);
}

public void writeRaw(byte[] record, int start, int length) throws IOException {
public long writeRaw(byte[] record, int start, int length) throws IOException {
_os.writeInt(length);
_os.write(record, start, length);
return _os.size();
}

public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public PailOutputStream(String userfilename, boolean overwrite) throws IOExcepti
}
}

public void writeRaw(byte[] record) throws IOException {
writeRaw(record, 0, record.length);
public long writeRaw(byte[] record) throws IOException {
return writeRaw(record, 0, record.length);
}

private Function<Object,Boolean> tryRename()
Expand Down Expand Up @@ -84,8 +84,8 @@ public void flush() throws IOException {
// NOT DOING ANYTHING TO LEAVE IT AT STATUS-QUO
}

public void writeRaw(byte[] record, int start, int length) throws IOException {
delegate.writeRaw(record, start, length);
public long writeRaw(byte[] record, int start, int length) throws IOException {
return delegate.writeRaw(record, start, length);
}
}

Expand Down Expand Up @@ -121,8 +121,8 @@ public S3PailOutputStream(String userfilename, boolean overwrite) throws IOExcep
delegate = createOutputStream(finalFile);
}

public void writeRaw(byte[] record) throws IOException {
writeRaw(record, 0, record.length);
public long writeRaw(byte[] record) throws IOException {
return writeRaw(record, 0, record.length);
}

public void close() throws IOException {
Expand All @@ -134,8 +134,8 @@ public void flush() throws IOException {
// NOT DOING ANYTHING TO LEAVE IT AT STATUS-QUO
}

public void writeRaw(byte[] record, int start, int length) throws IOException {
delegate.writeRaw(record, start, length);
public long writeRaw(byte[] record, int start, int length) throws IOException {
return delegate.writeRaw(record, start, length);
}
}

Expand Down
18 changes: 6 additions & 12 deletions dfs-datastores/src/main/java/com/backtype/hadoop/pail/Pail.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public TypedRecordOutputStream(String userfilename, boolean overwrite) {
_workers = new HashMap<String, RecordOutputStream>();
}

public <T> void writeObject(T obj) throws IOException {
public <T> long writeObject(T obj) throws IOException {
PailStructure<T> structure = ((PailStructure<T>) _structure);
List<String> rootAttrs = structure.getTarget(obj);
List<String> attrs = makeRelative(rootAttrs);
Expand All @@ -52,13 +52,7 @@ public <T> void writeObject(T obj) throws IOException {
_workers.put(targetDir, Pail.super.openWrite(p.toString(), _overwrite));
}
RecordOutputStream os = _workers.get(targetDir);
os.writeRaw(structure.serialize(obj));
}

public void writeObjects(T... objs) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Searched all repos and we're not using this anywhere.

for(T obj: objs) {
writeObject(obj);
}
return os.writeRaw(structure.serialize(obj));
}

public void close() throws IOException {
Expand All @@ -81,16 +75,16 @@ protected List<String> makeRelative(List<String> attrs) {
return Utils.stripRoot(getAttrs(), attrs);
}

public void writeRaw(byte[] record) throws IOException {
writeRaw(record, 0, record.length);
public long writeRaw(byte[] record) throws IOException {
return writeRaw(record, 0, record.length);
}

public void writeRaw(byte[] record, int start, int length) throws IOException {
public long writeRaw(byte[] record, int start, int length) throws IOException {
if(!_workers.containsKey(_userfilename)) {
checkValidStructure(_userfilename);
_workers.put(_userfilename, Pail.super.openWrite(_userfilename, _overwrite));
}
_workers.get(_userfilename).writeRaw(record, start, length);
return _workers.get(_userfilename).writeRaw(record, start, length);
}
}

Expand Down