mirror of
https://github.com/microsoft/HybridRow.git
synced 2026-01-19 17:33:13 +00:00
Move RowScanner and DataItem out of test into main
This commit is contained in:
@@ -93,7 +93,7 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
|
||||
@Override
|
||||
public char charAt(final int index) {
|
||||
throw new UnsupportedOperationException();
|
||||
throw new UnsupportedOperationException(lenientFormat("Utf8String.charAt(index: %s)", index));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -101,7 +101,13 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
*/
|
||||
@Override
|
||||
public IntStream chars() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
return this.buffer == null || this.buffer.writerIndex() == 0
|
||||
? IntStream.empty()
|
||||
: StreamSupport.intStream(
|
||||
() -> Spliterators.spliteratorUnknownSize(new UTF16CodeUnitIterator(this.buffer), Spliterator.ORDERED),
|
||||
Spliterator.ORDERED,false
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -109,13 +115,12 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
*/
|
||||
public final IntStream codePoints() {
|
||||
|
||||
if (this.buffer == null || this.buffer.writerIndex() == 0) {
|
||||
return IntStream.empty();
|
||||
}
|
||||
|
||||
return StreamSupport.intStream(
|
||||
() -> Spliterators.spliteratorUnknownSize(new CodePointIterator(this.buffer), Spliterator.ORDERED),
|
||||
Spliterator.ORDERED,false);
|
||||
return this.buffer == null || this.buffer.writerIndex() == 0
|
||||
? IntStream.empty()
|
||||
: StreamSupport.intStream(
|
||||
() -> Spliterators.spliteratorUnknownSize(new CodePointIterator(this.buffer), Spliterator.ORDERED),
|
||||
Spliterator.ORDERED,false
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -468,6 +473,9 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
if (this.buffer == null) {
|
||||
return null;
|
||||
}
|
||||
if (this.buffer.writerIndex() == 0) {
|
||||
return "";
|
||||
}
|
||||
return this.buffer.getCharSequence(0, this.buffer.writerIndex(), UTF_8).toString();
|
||||
}
|
||||
|
||||
@@ -578,6 +586,60 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
return -1;
|
||||
}
|
||||
|
||||
// region Types
|
||||
|
||||
private static final class UTF16CodeUnitIterator extends UTF8CodePointGetter implements IntIterator.OfInt {
|
||||
|
||||
private final ByteBuf buffer;
|
||||
private int start, length;
|
||||
private int lowSurrogate;
|
||||
|
||||
UTF16CodeUnitIterator(final ByteBuf buffer) {
|
||||
this.buffer = buffer;
|
||||
this.lowSurrogate = 0;
|
||||
this.start = 0;
|
||||
this.length = buffer.writerIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (this.lowSurrogate != 0) || (0 <= this.start && this.start < this.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the next {@code int} code point in the iteration.
|
||||
*
|
||||
* @return the next {@code int} code point in the iteration.
|
||||
* @throws NoSuchElementException if the iteration has no more code points.
|
||||
*/
|
||||
@Override
|
||||
public int nextInt() {
|
||||
|
||||
if (!this.hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
if (this.lowSurrogate != 0) {
|
||||
int codeUnit = this.lowSurrogate;
|
||||
this.lowSurrogate = 0;
|
||||
return codeUnit;
|
||||
}
|
||||
|
||||
final int index = this.buffer.forEachByte(this.start, this.length - this.start, this);
|
||||
assert index >= 0;
|
||||
this.start = index + 1;
|
||||
|
||||
final int codePoint = this.codePoint();
|
||||
|
||||
if ((codePoint & 0xFFFF0000) == 0) {
|
||||
return codePoint;
|
||||
}
|
||||
|
||||
this.lowSurrogate = Character.lowSurrogate(codePoint);
|
||||
return Character.highSurrogate(codePoint);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class CodePointIterator extends UTF8CodePointGetter implements IntIterator.OfInt {
|
||||
|
||||
private final ByteBuf buffer;
|
||||
@@ -648,7 +710,7 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
|
||||
@Override
|
||||
public void serialize(Utf8String value, JsonGenerator generator, SerializerProvider provider) throws IOException {
|
||||
generator.writeString(value.toString());
|
||||
generator.writeString(value.toUtf16());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -900,4 +962,6 @@ public final class Utf8String implements ByteBufHolder, CharSequence, Comparable
|
||||
return this.codePoint;
|
||||
}
|
||||
}
|
||||
|
||||
// endregion
|
||||
}
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.io;
|
||||
|
||||
import com.azure.data.cosmos.core.Json;
|
||||
import com.azure.data.cosmos.core.Utf8String;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutCode;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
/**
|
||||
* A path/type/value triplet representing a field in a HybridRow.
|
||||
*/
|
||||
public class DataItem {
|
||||
|
||||
private final Supplier<String> fullPath;
|
||||
private final Supplier<String> name;
|
||||
|
||||
@JsonProperty
|
||||
private final List<String> nodes;
|
||||
|
||||
@JsonProperty
|
||||
private final LayoutCode type;
|
||||
|
||||
@JsonProperty
|
||||
private final Object value;
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
DataItem(
|
||||
@Nonnull final List<Utf8String> nodes,
|
||||
@Nonnull final Utf8String name,
|
||||
@Nonnull final LayoutCode type,
|
||||
@Nonnull final Object value) {
|
||||
|
||||
checkNotNull(nodes, "expected non-null nodes");
|
||||
checkNotNull(name, "expected non-null name");
|
||||
checkNotNull(type, "expected non-null type");
|
||||
checkNotNull(value, "expected non-null value");
|
||||
|
||||
//noinspection ConstantConditions
|
||||
this.nodes = ImmutableList.<String>builderWithExpectedSize(nodes.size() + 1)
|
||||
.addAll(nodes.stream().map(Utf8String::toUtf16).iterator())
|
||||
.add(name.toUtf16())
|
||||
.build();
|
||||
|
||||
this.type = type;
|
||||
this.value = value;
|
||||
|
||||
this.name = Suppliers.memoize(() -> this.nodes.get(this.nodes.size() - 1));
|
||||
|
||||
this.fullPath = Suppliers.memoize(() -> {
|
||||
|
||||
if (this.nodes.size() == 1) {
|
||||
return this.nodes.get(0);
|
||||
}
|
||||
|
||||
StringBuilder builder = new StringBuilder(this.nodes.stream()
|
||||
.map(String::length)
|
||||
.reduce(this.nodes.size() - 1, Integer::sum)
|
||||
);
|
||||
|
||||
int i;
|
||||
|
||||
for (i = 0; i < this.nodes.size() - 1; i++) {
|
||||
builder.append(this.nodes.get(i));
|
||||
}
|
||||
|
||||
return builder.append(this.nodes.get(i)).toString();
|
||||
});
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return this.name.get();
|
||||
}
|
||||
|
||||
public List<String> nodes() {
|
||||
return this.nodes;
|
||||
}
|
||||
|
||||
public String path() {
|
||||
return this.fullPath.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Json.toString(this);
|
||||
}
|
||||
|
||||
public LayoutCode type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public Object value() {
|
||||
return this.value;
|
||||
}
|
||||
}
|
||||
@@ -163,6 +163,10 @@ public final class RowReader {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
public boolean isDone() {
|
||||
return this.state == States.DONE;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code true} if field has a value--if positioned on a field--undefined otherwise.
|
||||
* <p>
|
||||
@@ -690,8 +694,8 @@ public final class RowReader {
|
||||
* @return a new {@link RowReader}.
|
||||
*/
|
||||
public @Nonnull RowReader readScope() {
|
||||
RowCursor newScope = this.buffer.sparseIteratorReadScope(this.cursor, true);
|
||||
return new RowReader(this.buffer, newScope);
|
||||
RowCursor scope = this.buffer.sparseIteratorReadScope(this.cursor, true);
|
||||
return new RowReader(this.buffer, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,259 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.io;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.core.Utf8String;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowVersion;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutResolver;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutResolverNamespace;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutType;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.schemas.Namespace;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Stack;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.base.Strings.lenientFormat;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
public class RowScanner implements AutoCloseable {
|
||||
|
||||
private final AtomicBoolean closed;
|
||||
private final ByteBuf data;
|
||||
private final LayoutResolver resolver;
|
||||
|
||||
private RowScanner(LayoutResolver resolver, ByteBuf data) {
|
||||
this.closed = new AtomicBoolean();
|
||||
this.data = data.retain();
|
||||
this.resolver = resolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (this.closed.compareAndSet(false, true)) {
|
||||
this.data.release();
|
||||
}
|
||||
}
|
||||
|
||||
public static RowScanner open(@Nonnull Namespace namespace, @Nonnull File file) throws IOException {
|
||||
|
||||
checkNotNull(file, "expected non-null file");
|
||||
|
||||
final long length = file.length();
|
||||
checkArgument(0 < length, "file does not exist: %s", file);
|
||||
checkArgument(length <= Integer.MAX_VALUE, "expected file length <= %s, not %s", Integer.MAX_VALUE, length);
|
||||
|
||||
ByteBuf data = Unpooled.buffer((int) length);
|
||||
|
||||
try (InputStream stream = Files.newInputStream(file.toPath())) {
|
||||
data.writeBytes(stream, (int) length);
|
||||
}
|
||||
|
||||
LayoutResolverNamespace resolver = new LayoutResolverNamespace(namespace);
|
||||
return new RowScanner(resolver, data);
|
||||
}
|
||||
|
||||
public static RowScanner open(@Nonnull Namespace namespace, @Nonnull Path path) throws IOException {
|
||||
return RowScanner.open(namespace, requireNonNull(path, "expected non-null path").toFile());
|
||||
}
|
||||
|
||||
public static RowScanner open(@Nonnull Namespace namespace, @Nonnull String path) throws IOException {
|
||||
return RowScanner.open(namespace, new File(requireNonNull(path, "expected non-null path")));
|
||||
}
|
||||
|
||||
public <TContext> Result visit(BiFunction<DataItem, TContext, Result> accept, TContext context) {
|
||||
|
||||
checkState(!this.closed.get(), "RowScanner is closed");
|
||||
|
||||
final RowBuffer buffer = new RowBuffer(this.data, HybridRowVersion.V1, this.resolver);
|
||||
final RowReader reader = new RowReader(buffer);
|
||||
|
||||
return visit(reader, new Visitor<>(accept, context, new Stack<>()));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <TContext> Result visit(RowReader reader, Visitor<TContext> visitor) {
|
||||
|
||||
final Out value = new Out();
|
||||
|
||||
while (reader.read()) {
|
||||
|
||||
Utf8String path = reader.path();
|
||||
checkState(!path.isNull(), "expected non-null value for path");
|
||||
|
||||
LayoutType type = reader.type();
|
||||
checkState(type != null, "expected non-null type");
|
||||
|
||||
final Result result;
|
||||
value.set(null);
|
||||
|
||||
switch (type.layoutCode()) {
|
||||
|
||||
case BOOLEAN: {
|
||||
result = reader.readBoolean(value);
|
||||
break;
|
||||
}
|
||||
case INT_16: {
|
||||
result = reader.readInt16(value);
|
||||
break;
|
||||
}
|
||||
case INT_32: {
|
||||
result = reader.readInt32(value);
|
||||
break;
|
||||
}
|
||||
case INT_64: {
|
||||
result = reader.readInt64(value);
|
||||
break;
|
||||
}
|
||||
case UINT_8: {
|
||||
result = reader.readUInt8(value);
|
||||
break;
|
||||
}
|
||||
case UINT_32: {
|
||||
result = reader.readUInt32(value);
|
||||
break;
|
||||
}
|
||||
case UINT_64: {
|
||||
result = reader.readUInt64(value);
|
||||
break;
|
||||
}
|
||||
case BINARY: {
|
||||
result = reader.readBinary(value);
|
||||
break;
|
||||
}
|
||||
case GUID: {
|
||||
result = reader.readGuid(value);
|
||||
break;
|
||||
}
|
||||
case NULL:
|
||||
case BOOLEAN_FALSE:
|
||||
case INT_8:
|
||||
case UINT_16:
|
||||
case VAR_INT:
|
||||
case VAR_UINT:
|
||||
case FLOAT_32:
|
||||
case FLOAT_64:
|
||||
case FLOAT_128:
|
||||
case DECIMAL:
|
||||
case DATE_TIME:
|
||||
case UNIX_DATE_TIME:
|
||||
case UTF_8: {
|
||||
result = Result.SUCCESS;
|
||||
break;
|
||||
}
|
||||
case NULLABLE_SCOPE:
|
||||
case IMMUTABLE_NULLABLE_SCOPE: {
|
||||
if (!reader.hasValue()) {
|
||||
result = Result.SUCCESS;
|
||||
break;
|
||||
}
|
||||
}
|
||||
case ARRAY_SCOPE:
|
||||
case IMMUTABLE_ARRAY_SCOPE:
|
||||
|
||||
case MAP_SCOPE:
|
||||
case IMMUTABLE_MAP_SCOPE:
|
||||
|
||||
case OBJECT_SCOPE:
|
||||
case IMMUTABLE_OBJECT_SCOPE:
|
||||
|
||||
case SCHEMA:
|
||||
case IMMUTABLE_SCHEMA:
|
||||
|
||||
case SET_SCOPE:
|
||||
case IMMUTABLE_SET_SCOPE:
|
||||
|
||||
case TAGGED2_SCOPE:
|
||||
case IMMUTABLE_TAGGED2_SCOPE:
|
||||
|
||||
case TAGGED_SCOPE:
|
||||
case IMMUTABLE_TAGGED_SCOPE:
|
||||
|
||||
case TUPLE_SCOPE:
|
||||
case IMMUTABLE_TUPLE_SCOPE:
|
||||
|
||||
case TYPED_ARRAY_SCOPE:
|
||||
case IMMUTABLE_TYPED_ARRAY_SCOPE:
|
||||
|
||||
case TYPED_MAP_SCOPE:
|
||||
case IMMUTABLE_TYPED_MAP_SCOPE:
|
||||
|
||||
case TYPED_SET_SCOPE:
|
||||
case IMMUTABLE_TYPED_SET_SCOPE:
|
||||
|
||||
case TYPED_TUPLE_SCOPE:
|
||||
case IMMUTABLE_TYPED_TUPLE_SCOPE: {
|
||||
|
||||
visitor.nodes().push(path);
|
||||
result = reader.readScope(visitor, RowScanner::visit);
|
||||
visitor.nodes().pop();
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
case END_SCOPE: {
|
||||
throw new IllegalStateException(lenientFormat("unexpected layout type: %s", type));
|
||||
}
|
||||
case INVALID:
|
||||
case MONGODB_OBJECT_ID: {
|
||||
throw new IllegalStateException(lenientFormat("unsupported layout type: %s", type));
|
||||
}
|
||||
default: {
|
||||
throw new IllegalStateException(lenientFormat("unknown layout type: %s", type));
|
||||
}
|
||||
}
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
DataItem item = new DataItem(visitor.nodes(), path, type.layoutCode(), value.get());
|
||||
visitor.accept().apply(item, visitor.context());
|
||||
}
|
||||
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
private static class Visitor<TContext> {
|
||||
|
||||
private final BiFunction<DataItem, TContext, Result> accept;
|
||||
private final TContext context;
|
||||
private final Stack<Utf8String> nodes;
|
||||
|
||||
Visitor(BiFunction<DataItem, TContext, Result> accept, TContext context, Stack<Utf8String> nodes) {
|
||||
this.accept = accept;
|
||||
this.context = context;
|
||||
this.nodes = nodes;
|
||||
}
|
||||
|
||||
BiFunction<DataItem, TContext, Result> accept() {
|
||||
return this.accept;
|
||||
}
|
||||
|
||||
TContext context() {
|
||||
return this.context;
|
||||
}
|
||||
|
||||
Stack<Utf8String> nodes() {
|
||||
return this.nodes;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,6 @@ import com.azure.data.cosmos.core.Utf8String;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowVersion;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutCode;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutResolver;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutResolverNamespace;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutType;
|
||||
@@ -21,22 +20,36 @@ import org.testng.annotations.Factory;
|
||||
import org.testng.annotations.Test;
|
||||
import org.testng.util.Strings;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Objects;
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Stack;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.base.Strings.lenientFormat;
|
||||
import static org.testng.Assert.*;
|
||||
import static java.lang.System.out;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNull;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
import static org.testng.Assert.fail;
|
||||
import static org.testng.AssertJUnit.assertNotNull;
|
||||
|
||||
public class RowReaderTest {
|
||||
|
||||
private static final String basedir = System.getProperty("project.basedir", System.getProperty("user.dir"));
|
||||
private final File schemaFile;
|
||||
|
||||
private final Path dataFile;
|
||||
private final File schemaFile;
|
||||
|
||||
private Namespace namespace;
|
||||
|
||||
private RowReaderTest(File schemaFile, Path dataFile) {
|
||||
@@ -53,7 +66,17 @@ public class RowReaderTest {
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
public void testRead() {
|
||||
public void testIterable() throws IOException {
|
||||
|
||||
final RowIterable iterable = RowIterable.of(this.namespace, this.dataFile);
|
||||
|
||||
for (DataItem item : iterable) {
|
||||
assertNotNull(item.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
public void testReader() {
|
||||
|
||||
final long length = this.dataFile.toFile().length();
|
||||
assertTrue(0 < length && length < Integer.MAX_VALUE);
|
||||
@@ -81,6 +104,20 @@ public class RowReaderTest {
|
||||
assertEquals(result, Result.SUCCESS);
|
||||
}
|
||||
|
||||
@Test(groups = "unit")
|
||||
public void testScanner() throws IOException {
|
||||
|
||||
final RowScanner scanner = RowScanner.open(this.namespace, this.dataFile);
|
||||
|
||||
scanner.visit((DataItem item, Object context) -> {
|
||||
assertNull(context);
|
||||
assertNotNull(item);
|
||||
out.println(item);
|
||||
return Result.SUCCESS;
|
||||
}, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Result visitFields(RowReader reader, int level) {
|
||||
|
||||
Out out = new Out();
|
||||
@@ -98,6 +135,7 @@ public class RowReaderTest {
|
||||
out.set(null);
|
||||
|
||||
switch (type.layoutCode()) {
|
||||
|
||||
case BOOLEAN: {
|
||||
result = reader.readBoolean(out);
|
||||
break;
|
||||
@@ -242,4 +280,225 @@ public class RowReaderTest {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public static class RowIterable implements Iterable<DataItem> {
|
||||
|
||||
final ByteBuf data;
|
||||
final LayoutResolver resolver;
|
||||
|
||||
private RowIterable(LayoutResolver resolver, ByteBuf data) {
|
||||
this.resolver = resolver;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Iterator<DataItem> iterator() {
|
||||
final RowBuffer buffer = new RowBuffer(this.data, HybridRowVersion.V1, this.resolver);
|
||||
final RowReader reader = new RowReader(buffer);
|
||||
return new RowIterator(reader);
|
||||
}
|
||||
|
||||
public static RowIterable of(@Nonnull Namespace namespace, @Nonnull File file) throws IOException {
|
||||
|
||||
checkNotNull(file, "expected non-null file");
|
||||
|
||||
final long length = file.length();
|
||||
checkArgument(0 < length, "file does not exist: %s", file);
|
||||
checkArgument(length <= Integer.MAX_VALUE, "expected file length <= %s, not %s", Integer.MAX_VALUE, length);
|
||||
|
||||
ByteBuf data = Unpooled.buffer((int) length);
|
||||
|
||||
try (InputStream stream = Files.newInputStream(file.toPath())) {
|
||||
data.writeBytes(stream, (int) length);
|
||||
}
|
||||
|
||||
LayoutResolverNamespace resolver = new LayoutResolverNamespace(namespace);
|
||||
return new RowIterable(resolver, data);
|
||||
}
|
||||
|
||||
public static RowIterable of(@Nonnull Namespace namespace, @Nonnull Path path) throws IOException {
|
||||
return RowIterable.of(namespace, requireNonNull(path, "expected non-null path").toFile());
|
||||
}
|
||||
|
||||
public static RowIterable of(@Nonnull Namespace namespace, @Nonnull String path) throws IOException {
|
||||
return RowIterable.of(namespace, new File(requireNonNull(path, "expected non-null path")));
|
||||
}
|
||||
|
||||
private static class RowIterator implements Iterator<DataItem> {
|
||||
|
||||
final Stack<Utf8String> paths;
|
||||
final Stack<RowReader> readers;
|
||||
final Out value;
|
||||
|
||||
RowReader reader;
|
||||
Result result;
|
||||
|
||||
RowIterator(RowReader reader) {
|
||||
this.readers = new Stack<>();
|
||||
this.paths = new Stack<>();
|
||||
this.reader = reader;
|
||||
this.value = new Out();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return this.reader != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the next element in the iteration.
|
||||
*
|
||||
* @return the next element in the iteration
|
||||
* @throws NoSuchElementException if the iteration has no more elements
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public DataItem next() {
|
||||
|
||||
do {
|
||||
while (this.reader.read()) {
|
||||
|
||||
Utf8String path = this.reader.path();
|
||||
checkState(!path.isNull(), "expected non-null path");
|
||||
|
||||
LayoutType type = this.reader.type();
|
||||
checkState(type != null, "expected non-null type");
|
||||
|
||||
switch (type.layoutCode()) {
|
||||
|
||||
case BOOLEAN: {
|
||||
this.result = this.reader.readBoolean(this.value);
|
||||
break;
|
||||
}
|
||||
case INT_16: {
|
||||
this.result = this.reader.readInt16(this.value);
|
||||
break;
|
||||
}
|
||||
case INT_32: {
|
||||
this.result = this.reader.readInt32(this.value);
|
||||
break;
|
||||
}
|
||||
case INT_64: {
|
||||
this.result = this.reader.readInt64(this.value);
|
||||
break;
|
||||
}
|
||||
case UINT_8: {
|
||||
this.result = this.reader.readUInt8(this.value);
|
||||
break;
|
||||
}
|
||||
case UINT_32: {
|
||||
this.result = this.reader.readUInt32(this.value);
|
||||
break;
|
||||
}
|
||||
case UINT_64: {
|
||||
this.result = this.reader.readUInt64(this.value);
|
||||
break;
|
||||
}
|
||||
case BINARY: {
|
||||
this.result = this.reader.readBinary(this.value);
|
||||
break;
|
||||
}
|
||||
case GUID: {
|
||||
this.result = this.reader.readGuid(this.value);
|
||||
break;
|
||||
}
|
||||
case NULL:
|
||||
case BOOLEAN_FALSE:
|
||||
case INT_8:
|
||||
case UINT_16:
|
||||
case VAR_INT:
|
||||
case VAR_UINT:
|
||||
case FLOAT_32:
|
||||
case FLOAT_64:
|
||||
case FLOAT_128:
|
||||
case DECIMAL:
|
||||
case DATE_TIME:
|
||||
case UNIX_DATE_TIME:
|
||||
case UTF_8: {
|
||||
break;
|
||||
}
|
||||
case NULLABLE_SCOPE:
|
||||
case IMMUTABLE_NULLABLE_SCOPE: {
|
||||
if (!this.reader.hasValue()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
case ARRAY_SCOPE:
|
||||
case IMMUTABLE_ARRAY_SCOPE:
|
||||
|
||||
case MAP_SCOPE:
|
||||
case IMMUTABLE_MAP_SCOPE:
|
||||
|
||||
case OBJECT_SCOPE:
|
||||
case IMMUTABLE_OBJECT_SCOPE:
|
||||
|
||||
case SCHEMA:
|
||||
case IMMUTABLE_SCHEMA:
|
||||
|
||||
case SET_SCOPE:
|
||||
case IMMUTABLE_SET_SCOPE:
|
||||
|
||||
case TAGGED2_SCOPE:
|
||||
case IMMUTABLE_TAGGED2_SCOPE:
|
||||
|
||||
case TAGGED_SCOPE:
|
||||
case IMMUTABLE_TAGGED_SCOPE:
|
||||
|
||||
case TUPLE_SCOPE:
|
||||
case IMMUTABLE_TUPLE_SCOPE:
|
||||
|
||||
case TYPED_ARRAY_SCOPE:
|
||||
case IMMUTABLE_TYPED_ARRAY_SCOPE:
|
||||
|
||||
case TYPED_MAP_SCOPE:
|
||||
case IMMUTABLE_TYPED_MAP_SCOPE:
|
||||
|
||||
case TYPED_SET_SCOPE:
|
||||
case IMMUTABLE_TYPED_SET_SCOPE:
|
||||
|
||||
case TYPED_TUPLE_SCOPE:
|
||||
case IMMUTABLE_TYPED_TUPLE_SCOPE: {
|
||||
|
||||
this.readers.push(this.reader);
|
||||
this.paths.push(path);
|
||||
this.reader = this.reader.readScope();
|
||||
continue;
|
||||
}
|
||||
case END_SCOPE: {
|
||||
final RowReader child = this.reader;
|
||||
this.reader = this.readers.pop();
|
||||
continue;
|
||||
}
|
||||
case INVALID:
|
||||
case MONGODB_OBJECT_ID: {
|
||||
throw new IllegalStateException(lenientFormat("unsupported layout type: %s", type));
|
||||
}
|
||||
default: {
|
||||
throw new IllegalStateException(lenientFormat("unknown layout type: %s", type));
|
||||
}
|
||||
}
|
||||
|
||||
if (this.result != Result.SUCCESS) {
|
||||
String message = lenientFormat("failed to read %s value for %s", type.layoutCode(), path);
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
|
||||
return new DataItem(this.paths, path, type.layoutCode(), this.value.get());
|
||||
}
|
||||
|
||||
if (this.readers.empty()) {
|
||||
this.reader = null;
|
||||
} else {
|
||||
this.reader = this.readers.pop();
|
||||
this.paths.pop();
|
||||
}
|
||||
}
|
||||
while (this.reader != null);
|
||||
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user