mirror of
https://github.com/microsoft/HybridRow.git
synced 2026-01-21 10:23:13 +00:00
Progressed on port from dotnet to java
This commit is contained in:
@@ -0,0 +1,37 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
public final class Record {
|
||||
|
||||
public static Record empty() {
|
||||
return new Record(0, 0);
|
||||
}
|
||||
|
||||
private int crc32;
|
||||
private int length;
|
||||
|
||||
public Record(int length, int crc32) {
|
||||
this.length = length;
|
||||
this.crc32 = crc32;
|
||||
}
|
||||
|
||||
public int crc32() {
|
||||
return this.crc32;
|
||||
}
|
||||
|
||||
public Record crc32(int value) {
|
||||
this.crc32 = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int length() {
|
||||
return this.length;
|
||||
}
|
||||
|
||||
public Record length(int value) {
|
||||
this.length = value;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowHeader;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowVersion;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.ISpanResizer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.Layout;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.SystemSchema;
|
||||
|
||||
public final class RecordIOFormatter {
|
||||
|
||||
public static final Layout RECORD_LAYOUT = SystemSchema.layoutResolver.resolve(SystemSchema.RECORD_SCHEMA_ID);
|
||||
public static final Layout SEGMENT_LAYOUT = SystemSchema.layoutResolver.resolve(SystemSchema.SEGMENT_SCHEMA_ID);
|
||||
|
||||
public static Result FormatRecord(ReadOnlyMemory<Byte> body, Out<RowBuffer> row) {
|
||||
return FormatRecord(body, row, null);
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER NOTE: Java does not support optional parameters. Overloaded method(s) are created above:
|
||||
//ORIGINAL LINE: public static Result FormatRecord(ReadOnlyMemory<byte> body, out RowBuffer row,
|
||||
// ISpanResizer<byte> resizer = default)
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
public static Result FormatRecord(ReadOnlyMemory<Byte> body, Out<RowBuffer> row,
|
||||
ISpanResizer<Byte> resizer) {
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: resizer = resizer != null ? resizer : DefaultSpanResizer<byte>.Default;
|
||||
resizer = resizer != null ? resizer : DefaultSpanResizer < Byte >.Default;
|
||||
int estimatedSize = HybridRowHeader.BYTES + RecordIOFormatter.RECORD_LAYOUT.getSize() + body.Length;
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: uint crc32 = Crc32.Update(0, body.Span);
|
||||
int crc32 = Crc32.Update(0, body.Span);
|
||||
Record record = new Record(body.Length, crc32);
|
||||
return RecordIOFormatter.FormatObject(resizer, estimatedSize, RecordIOFormatter.RECORD_LAYOUT, record.clone(),
|
||||
RecordSerializer.Write, row.clone());
|
||||
}
|
||||
|
||||
public static Result FormatSegment(Segment segment, Out<RowBuffer> row) {
|
||||
return FormatSegment(segment, row, null);
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER NOTE: Java does not support optional parameters. Overloaded method(s) are created above:
|
||||
//ORIGINAL LINE: public static Result FormatSegment(Segment segment, out RowBuffer row, ISpanResizer<byte>
|
||||
// resizer = default)
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
public static Result FormatSegment(Segment segment, Out<RowBuffer> row, ISpanResizer<Byte> resizer) {
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: resizer = resizer != null ? resizer : DefaultSpanResizer<byte>.Default;
|
||||
resizer = resizer != null ? resizer : DefaultSpanResizer < Byte >.Default;
|
||||
int estimatedSize =
|
||||
HybridRowHeader.BYTES + RecordIOFormatter.SEGMENT_LAYOUT.getSize() + segment.comment() == null ? null :
|
||||
segment.comment().length() != null ? segment.comment().length() : 0 + segment.sdl() == null ? null :
|
||||
segment.sdl().length() != null ? segment.sdl().length() : 0 + 20;
|
||||
|
||||
return RecordIOFormatter.FormatObject(resizer, estimatedSize, RecordIOFormatter.SEGMENT_LAYOUT,
|
||||
segment.clone(), SegmentSerializer.Write, row.clone());
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: private static Result FormatObject<T>(ISpanResizer<byte> resizer, int initialCapacity, Layout
|
||||
// layout, T obj, RowWriter.WriterFunc<T> writer, out RowBuffer row)
|
||||
private static <T> Result FormatObject(ISpanResizer<Byte> resizer, int initialCapacity, Layout layout, T obj,
|
||||
RowWriter.WriterFunc<T> writer, Out<RowBuffer> row) {
|
||||
row.setAndGet(new RowBuffer(initialCapacity, resizer));
|
||||
row.get().initLayout(HybridRowVersion.V1, layout, SystemSchema.layoutResolver);
|
||||
Result r = RowWriter.WriteBuffer(row.clone(), obj, writer);
|
||||
if (r != Result.SUCCESS) {
|
||||
row.setAndGet(null);
|
||||
return r;
|
||||
}
|
||||
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,354 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.core.Reference;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowHeader;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowVersion;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.SchemaId;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.RowReader;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.Segment;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.SystemSchema;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import it.unimi.dsi.fastutil.bytes.Byte2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.bytes.Byte2ObjectOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
public final class RecordIOParser {
|
||||
|
||||
private Record record;
|
||||
private Segment segment;
|
||||
private State state = State.values()[0];
|
||||
|
||||
/**
|
||||
* Processes one buffers worth of data possibly advancing the parser state
|
||||
*
|
||||
* @param buffer1 The buffer to consume
|
||||
* @param type Indicates the type of Hybrid Row produced in {@code record}
|
||||
* @param record If non-empty, then the body of the next record in the sequence
|
||||
* @param need The smallest number of bytes needed to advanced the parser state further
|
||||
* <p>
|
||||
* It is recommended that this method not be called again until at least this number of bytes are
|
||||
* available.
|
||||
* @param consumed The number of bytes consumed from the input buffer
|
||||
* <p>
|
||||
* This number may be less than the total buffer size if the parser moved to a new state.
|
||||
* @return {@link Result#SUCCESS} if no error has occurred;, otherwise the {@link Result} of the last error
|
||||
* encountered during parsing.
|
||||
* <p>
|
||||
* >
|
||||
*/
|
||||
@Nonnull
|
||||
public Result process(
|
||||
@Nonnull final ByteBuf buffer,
|
||||
@Nonnull final Out<ProductionType> type,
|
||||
@Nonnull final Out<ByteBuf> record,
|
||||
@Nonnull final Out<Integer> need,
|
||||
@Nonnull final Out<Integer> consumed) {
|
||||
|
||||
Result result = Result.FAILURE;
|
||||
type.set(ProductionType.NONE);
|
||||
record.set(null);
|
||||
|
||||
final int start = buffer.readerIndex();
|
||||
|
||||
switch (this.state) {
|
||||
|
||||
case STATE:
|
||||
this.state = State.NEED_SEGMENT_LENGTH;
|
||||
// TODO: C# TO JAVA CONVERTER: There is no 'goto' in Java:
|
||||
// goto case State.NeedSegmentLength;
|
||||
|
||||
case NEED_SEGMENT_LENGTH: {
|
||||
|
||||
final int minimalSegmentRowSize = HybridRowHeader.BYTES + RecordIOFormatter.SEGMENT_LAYOUT.size();
|
||||
|
||||
if (buffer.readableBytes() < minimalSegmentRowSize) {
|
||||
consumed.set(buffer.readerIndex() - start);
|
||||
need.set(minimalSegmentRowSize);
|
||||
return Result.INSUFFICIENT_BUFFER;
|
||||
}
|
||||
|
||||
ByteBuf span = buffer.slice(buffer.readerIndex(), minimalSegmentRowSize);
|
||||
RowBuffer row = new RowBuffer(span, HybridRowVersion.V1, SystemSchema.layoutResolver);
|
||||
Reference<RowBuffer> tempReference_row =
|
||||
new Reference<RowBuffer>(row);
|
||||
RowReader reader = new RowReader(tempReference_row);
|
||||
row = tempReference_row.get();
|
||||
Reference<RowReader> tempReference_reader =
|
||||
new Reference<RowReader>(reader);
|
||||
Out<Segment> tempOut_segment =
|
||||
new Out<Segment>();
|
||||
result = SegmentSerializer.read(tempReference_reader, tempOut_segment);
|
||||
this.segment = tempOut_segment.get();
|
||||
reader = tempReference_reader.get();
|
||||
if (result != Result.SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
this.state = State.NEED_SEGMENT;
|
||||
// TODO: C# TO JAVA CONVERTER: There is no 'goto' in Java:
|
||||
goto case State.NEED_SEGMENT
|
||||
}
|
||||
|
||||
case NEED_SEGMENT: {
|
||||
if (buffer.Length < this.segment.length()) {
|
||||
need.set(this.segment.length());
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
return Result.INSUFFICIENT_BUFFER;
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Span<byte> span = b.Span.Slice(0, this.segment.Length);
|
||||
Span<Byte> span = buffer.Span.Slice(0, this.segment.length());
|
||||
RowBuffer row = new RowBuffer(span, HybridRowVersion.V1, SystemSchema.layoutResolver);
|
||||
Reference<RowBuffer> tempReference_row2 =
|
||||
new Reference<RowBuffer>(row);
|
||||
RowReader reader = new RowReader(tempReference_row2);
|
||||
row = tempReference_row2.get();
|
||||
Reference<RowReader> tempReference_reader2 =
|
||||
new Reference<RowReader>(reader);
|
||||
Out<Segment> tempOut_segment2
|
||||
= new Out<Segment>();
|
||||
result = SegmentSerializer.read(tempReference_reader2, tempOut_segment2);
|
||||
this.segment = tempOut_segment2.get();
|
||||
reader = tempReference_reader2.get();
|
||||
if (result != Result.SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
record.set(buffer.Slice(0, span.Length));
|
||||
buffer = buffer.Slice(span.Length);
|
||||
need.set(0);
|
||||
this.state = State.NEED_HEADER;
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
type.set(ProductionType.SEGMENT);
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
case NEED_HEADER: {
|
||||
if (buffer.Length < HybridRowHeader.BYTES) {
|
||||
need.set(HybridRowHeader.BYTES);
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
return Result.INSUFFICIENT_BUFFER;
|
||||
}
|
||||
|
||||
HybridRowHeader header;
|
||||
// TODO: C# TO JAVA CONVERTER: The following method call contained an unresolved 'out' keyword -
|
||||
// these cannot be converted using the 'Out' helper class unless the method is within the code
|
||||
// being modified:
|
||||
MemoryMarshal.TryRead(buffer.Span, out header);
|
||||
if (header.Version != HybridRowVersion.V1) {
|
||||
result = Result.INVALID_ROW;
|
||||
break;
|
||||
}
|
||||
|
||||
if (SchemaId.opEquals(header.SchemaId,
|
||||
SystemSchema.SEGMENT_SCHEMA_ID)) {
|
||||
// TODO: C# TO JAVA CONVERTER: There is no 'goto' in Java:
|
||||
goto case State.NEED_SEGMENT
|
||||
}
|
||||
|
||||
if (SchemaId.opEquals(header.SchemaId,
|
||||
SystemSchema.RECORD_SCHEMA_ID)) {
|
||||
// TODO: C# TO JAVA CONVERTER: There is no 'goto' in Java:
|
||||
goto case State.NEED_RECORD
|
||||
}
|
||||
|
||||
result = Result.INVALID_ROW;
|
||||
break;
|
||||
}
|
||||
|
||||
case NEED_RECORD: {
|
||||
int minimalRecordRowSize = HybridRowHeader.BYTES + RecordIOFormatter.RECORD_LAYOUT.size();
|
||||
if (buffer.Length < minimalRecordRowSize) {
|
||||
need.set(minimalRecordRowSize);
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
return Result.INSUFFICIENT_BUFFER;
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Span<byte> span = b.Span.Slice(0, minimalRecordRowSize);
|
||||
Span<Byte> span = buffer.Span.Slice(0, minimalRecordRowSize);
|
||||
RowBuffer row = new RowBuffer(span, HybridRowVersion.V1, SystemSchema.layoutResolver);
|
||||
Reference<RowBuffer> tempReference_row3 =
|
||||
new Reference<RowBuffer>(row);
|
||||
RowReader reader = new RowReader(tempReference_row3);
|
||||
row = tempReference_row3.get();
|
||||
Reference<RowReader> tempReference_reader3 = new Reference<RowReader>(reader);
|
||||
Out<Record> tempOut_record = new Out<Record>();
|
||||
result = RecordSerializer.read(tempReference_reader3, tempOut_record);
|
||||
this.record = tempOut_record.get();
|
||||
reader = tempReference_reader3.get();
|
||||
if (result != Result.SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
||||
buffer = buffer.Slice(span.Length);
|
||||
this.state = State.NEED_ROW;
|
||||
// TODO: C# TO JAVA CONVERTER: There is no 'goto' in Java:
|
||||
goto case State.NEED_ROW
|
||||
}
|
||||
|
||||
case NEED_ROW: {
|
||||
if (buffer.Length < this.record.length()) {
|
||||
need.set(this.record.length());
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
return Result.INSUFFICIENT_BUFFER;
|
||||
}
|
||||
|
||||
record.set(buffer.Slice(0, this.record.length()));
|
||||
|
||||
// Validate that the record has not been corrupted.
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: uint crc32 = Crc32.Update(0, record.Span);
|
||||
int crc32 = Crc32.Update(0, record.get().Span);
|
||||
if (crc32 != this.record.crc32()) {
|
||||
result = Result.INVALID_ROW;
|
||||
break;
|
||||
}
|
||||
|
||||
buffer = buffer.Slice(this.record.length());
|
||||
need.set(0);
|
||||
this.state = State.NEED_HEADER;
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
type.set(ProductionType.RECORD);
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
this.state = State.ERROR;
|
||||
need.set(0);
|
||||
consumed.set(buffer.Length - buffer.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if a valid segment has been parsed.
|
||||
*/
|
||||
public boolean haveSegment() {
|
||||
return this.state.value() >= State.NEED_HEADER.value();
|
||||
}
|
||||
|
||||
/**
|
||||
* If a valid segment has been parsed then current active segment, otherwise undefined.
|
||||
*/
|
||||
public Segment segment() {
|
||||
checkState(this.haveSegment());
|
||||
return this.segment;
|
||||
}
|
||||
|
||||
/**
|
||||
* Describes the type of Hybrid Rows produced by the parser.
|
||||
*/
|
||||
public enum ProductionType {
|
||||
/**
|
||||
* No hybrid row was produced. The parser needs more data.
|
||||
*/
|
||||
NONE(0),
|
||||
|
||||
/**
|
||||
* A new segment row was produced.
|
||||
*/
|
||||
SEGMENT(1),
|
||||
|
||||
/**
|
||||
* A record in the current segment was produced.
|
||||
*/
|
||||
RECORD(2);
|
||||
|
||||
public static final int BYTES = Integer.BYTES;
|
||||
|
||||
private static Int2ObjectMap<ProductionType> mappings;
|
||||
private int value;
|
||||
|
||||
ProductionType(int value) {
|
||||
this.value = value;
|
||||
mappings().put(value, this);
|
||||
}
|
||||
|
||||
public int value() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
public static ProductionType from(int value) {
|
||||
return mappings().get(value);
|
||||
}
|
||||
|
||||
private static Int2ObjectMap<ProductionType> mappings() {
|
||||
if (mappings == null) {
|
||||
synchronized (ProductionType.class) {
|
||||
if (mappings == null) {
|
||||
mappings = new Int2ObjectOpenHashMap<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
return mappings;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The states for the internal state machine.
|
||||
* Note: numerical ordering of these states matters.
|
||||
*/
|
||||
private enum State {
|
||||
STATE(
|
||||
(byte) 0, "Start: no buffers have yet been provided to the parser"),
|
||||
ERROR(
|
||||
(byte) 1, "Unrecoverable parse error encountered"),
|
||||
NEED_SEGMENT_LENGTH(
|
||||
(byte) 2, "Parsing segment header length"),
|
||||
NEED_SEGMENT(
|
||||
(byte) 3, "Parsing segment header"),
|
||||
NEED_HEADER(
|
||||
(byte) 4, "Parsing HybridRow header"),
|
||||
NEED_RECORD(
|
||||
(byte) 5, "Parsing record header"),
|
||||
NEED_ROW(
|
||||
(byte) 6, "Parsing row body");
|
||||
|
||||
public static final int BYTES = Byte.SIZE;
|
||||
|
||||
private static Byte2ObjectMap<State> mappings;
|
||||
private final String description;
|
||||
private final byte value;
|
||||
|
||||
State(byte value, String description) {
|
||||
this.description = description;
|
||||
this.value = value;
|
||||
mappings().put(value, this);
|
||||
}
|
||||
|
||||
public String description() {
|
||||
return this.description;
|
||||
}
|
||||
|
||||
public byte value() {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
public static State from(byte value) {
|
||||
return mappings().get(value);
|
||||
}
|
||||
|
||||
private static Byte2ObjectMap<State> mappings() {
|
||||
if (mappings == null) {
|
||||
synchronized (State.class) {
|
||||
if (mappings == null) {
|
||||
mappings = new Byte2ObjectOpenHashMap<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
return mappings;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,368 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.MemorySpanResizer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
public final class RecordIOStream {
|
||||
/**
|
||||
* A function that produces RecordIO record bodies.
|
||||
* <p>
|
||||
* Record bodies are returned as memory blocks. It is expected that each block is a
|
||||
* HybridRow, but any binary data is allowed.
|
||||
*
|
||||
* @param index The 0-based index of the record within the segment to be produced.
|
||||
* @return A tuple with: Success if the body was produced without error, the error code otherwise.
|
||||
* And, the byte sequence of the record body's row buffer.
|
||||
*/
|
||||
public delegate ValueTask
|
||||
|
||||
ProduceFuncAsync(long index);<(Result,ReadOnlyMemory<Byte>)>
|
||||
|
||||
/**
|
||||
* Reads an entire RecordIO stream.
|
||||
*
|
||||
* @param stm The stream to read from.
|
||||
* @param visitRecord A (required) delegate that is called once for each record.
|
||||
* <p>
|
||||
* <paramref name="visitRecord" /> is passed a {@link Memory{T}} of the byte sequence
|
||||
* of the
|
||||
* record body's row buffer.
|
||||
* </p>
|
||||
* <p>If <paramref name="visitRecord" /> returns an error then the sequence is aborted.</p>
|
||||
* @param visitSegment An (optional) delegate that is called once for each segment header.
|
||||
* <p>
|
||||
* If <paramref name="visitSegment" /> is not provided then segment headers are parsed but
|
||||
* skipped
|
||||
* over.
|
||||
* </p>
|
||||
* <p>
|
||||
* <paramref name="visitSegment" /> is passed a {@link Memory{T}} of the byte sequence of
|
||||
* the segment header's row buffer.
|
||||
* </p>
|
||||
* <p>If <paramref name="visitSegment" /> returns an error then the sequence is aborted.</p>
|
||||
* @param resizer Optional memory resizer.
|
||||
* @return Success if the stream is parsed without error, the error code otherwise.
|
||||
*/
|
||||
|
||||
public static Task<Result> ReadRecordIOAsync(Stream stm, Func<Memory<Byte>, Result> visitRecord,
|
||||
Func<Memory<Byte>, Result> visitSegment) {
|
||||
return ReadRecordIOAsync(stm, visitRecord, visitSegment, null);
|
||||
}
|
||||
|
||||
public static Task<Result> ReadRecordIOAsync(Stream stm, Func<Memory<Byte>, Result> visitRecord) {
|
||||
return ReadRecordIOAsync(stm, visitRecord, null, null);
|
||||
}
|
||||
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent in Java to the 'async' keyword:
|
||||
//ORIGINAL LINE: public static async Task<Result> ReadRecordIOAsync(this Stream stm, Func<Memory<byte>, Result>
|
||||
// visitRecord, Func<Memory<byte>, Result> visitSegment = default, MemorySpanResizer<byte> resizer = default)
|
||||
//C# TO JAVA CONVERTER NOTE: Java does not support optional parameters. Overloaded method(s) are created above:
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
public static Task<Result> ReadRecordIOAsync(InputStream stm,
|
||||
tangible.Func1Param<Memory<Byte>, Result> visitRecord,
|
||||
tangible.Func1Param<Memory<Byte>, Result> visitSegment,
|
||||
MemorySpanResizer<Byte> resizer) {
|
||||
checkArgument(stm != null);
|
||||
checkArgument(visitRecord != null);
|
||||
|
||||
// Create a reusable, resizable buffer if the caller didn't provide one.
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: resizer = resizer != null ? resizer : new MemorySpanResizer<byte>();
|
||||
resizer = resizer != null ? resizer : new MemorySpanResizer<Byte>();
|
||||
|
||||
RecordIOParser parser = null;
|
||||
int need = 0;
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Memory<byte> active = resizer.Memory;
|
||||
Memory<Byte> active = resizer.getMemory();
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Memory<byte> avail = default;
|
||||
Memory<Byte> avail = null;
|
||||
while (true) {
|
||||
checkState(avail.Length < active.Length);
|
||||
checkState(active.Length > 0);
|
||||
checkState(active.Length >= need);
|
||||
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent to 'await' in Java:
|
||||
int read = await stm.ReadAsync(active.Slice(avail.Length));
|
||||
if (read == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
avail = active.Slice(0, avail.Length + read);
|
||||
|
||||
// If there isn't enough data to move the parser forward then just read again.
|
||||
if (avail.Length < need) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process the available data until no more forward progress is possible.
|
||||
while (avail.Length > 0) {
|
||||
// Loop around processing available data until we don't have anymore
|
||||
RecordIOParser.ProductionType prodType;
|
||||
Out<RecordIOParser.ProductionType> tempOut_prodType = new Out<RecordIOParser.ProductionType>();
|
||||
Memory<Byte> record;
|
||||
Out<Memory<Byte>> tempOut_record = new Out<Memory<Byte>>();
|
||||
Out<Integer> tempOut_need = new Out<Integer>();
|
||||
int consumed;
|
||||
Out<Integer> tempOut_consumed = new Out<Integer>();
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Result r = parser.Process(avail, out RecordIOParser.ProductionType prodType, out
|
||||
// Memory<byte> record, out need, out int consumed);
|
||||
Result r = parser.process(avail, tempOut_prodType, tempOut_record, tempOut_need, tempOut_consumed);
|
||||
consumed = tempOut_consumed.get();
|
||||
need = tempOut_need.get();
|
||||
record = tempOut_record.get();
|
||||
prodType = tempOut_prodType.get();
|
||||
|
||||
if ((r != Result.SUCCESS) && (r != Result.INSUFFICIENT_BUFFER)) {
|
||||
return r;
|
||||
}
|
||||
|
||||
active = active.Slice(consumed);
|
||||
avail = avail.Slice(consumed);
|
||||
if (avail.IsEmpty) {
|
||||
active = resizer.getMemory();
|
||||
}
|
||||
|
||||
// If there wasn't enough data to move the parser forward then get more data.
|
||||
if (r == Result.INSUFFICIENT_BUFFER) {
|
||||
if (need > active.Length) {
|
||||
resizer.Resize(need, avail.Span);
|
||||
active = resizer.getMemory();
|
||||
avail = resizer.getMemory().Slice(0, avail.Length);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Validate the Segment
|
||||
if (prodType == RecordIOParser.ProductionType.SEGMENT) {
|
||||
checkState(!record.IsEmpty);
|
||||
r = visitSegment == null ? null : visitSegment.invoke(record) != null ?
|
||||
visitSegment.invoke(record) : Result.SUCCESS;
|
||||
if (r != Result.SUCCESS) {
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
||||
// Consume the record.
|
||||
if (prodType == RecordIOParser.ProductionType.RECORD) {
|
||||
checkState(!record.IsEmpty);
|
||||
|
||||
r = visitRecord.invoke(record);
|
||||
if (r != Result.SUCCESS) {
|
||||
return r;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we processed all of the available data.
|
||||
checkState(avail.Length == 0);
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a RecordIO segment into a stream.
|
||||
*
|
||||
* @param stm The stream to write to.
|
||||
* @param segment The segment header to write.
|
||||
* @param produce A function to produces the record bodies for the segment.
|
||||
* <p>
|
||||
* The <paramref name="produce" /> function is called until either an error is encountered or it
|
||||
* produces an empty body. An empty body terminates the segment.
|
||||
* </p>
|
||||
* <p>If <paramref name="produce" /> returns an error then the sequence is aborted.</p>
|
||||
* @param resizer Optional memory resizer for RecordIO metadata row buffers.
|
||||
* <p>
|
||||
* <em>Note:</em> This should <em>NOT</em> be the same resizer used to process any rows as both
|
||||
* blocks of memory are used concurrently.
|
||||
* </p>
|
||||
* @return Success if the stream is written without error, the error code otherwise.
|
||||
*/
|
||||
|
||||
public static Task<Result> WriteRecordIOAsync(Stream stm, Segment segment, ProduceFunc produce) {
|
||||
return WriteRecordIOAsync(stm, segment, produce, null);
|
||||
}
|
||||
|
||||
//C# TO JAVA CONVERTER NOTE: Java does not support optional parameters. Overloaded method(s) are created above:
|
||||
//ORIGINAL LINE: public static Task<Result> WriteRecordIOAsync(this Stream stm, Segment segment, ProduceFunc
|
||||
// produce, MemorySpanResizer<byte> resizer = default)
|
||||
// TODO: C# TO JAVA CONVERTER: C# to Java Converter cannot determine whether this System.IO.Stream is input or
|
||||
// output:
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
public static Task<Result> WriteRecordIOAsync(Stream stm, Segment segment, ProduceFunc produce,
|
||||
MemorySpanResizer<Byte> resizer) {
|
||||
return RecordIOStream.WriteRecordIOAsync(stm,
|
||||
segment.clone(), index ->
|
||||
{
|
||||
ReadOnlyMemory<Byte> buffer;
|
||||
Out<ReadOnlyMemory<Byte>> tempOut_buffer = new Out<ReadOnlyMemory<Byte>>();
|
||||
buffer = tempOut_buffer.get();
|
||||
return new ValueTask<(Result, ReadOnlyMemory < Byte >) > ((r,buffer))
|
||||
}, resizer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a RecordIO segment into a stream.
|
||||
*
|
||||
* @param stm The stream to write to.
|
||||
* @param segment The segment header to write.
|
||||
* @param produce A function to produces the record bodies for the segment.
|
||||
* <p>
|
||||
* The <paramref name="produce" /> function is called until either an error is encountered or it
|
||||
* produces an empty body. An empty body terminates the segment.
|
||||
* </p>
|
||||
* <p>If <paramref name="produce" /> returns an error then the sequence is aborted.</p>
|
||||
* @param resizer Optional memory resizer for RecordIO metadata row buffers.
|
||||
* <p>
|
||||
* <em>Note:</em> This should <em>NOT</em> be the same resizer used to process any rows as both
|
||||
* blocks of memory are used concurrently.
|
||||
* </p>
|
||||
* @return Success if the stream is written without error, the error code otherwise.
|
||||
*/
|
||||
|
||||
public static Task<Result> WriteRecordIOAsync(Stream stm, Segment segment, ProduceFuncAsync produce) {
|
||||
return WriteRecordIOAsync(stm, segment, produce, null);
|
||||
}
|
||||
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent in Java to the 'async' keyword:
|
||||
//ORIGINAL LINE: public static async Task<Result> WriteRecordIOAsync(this Stream stm, Segment segment,
|
||||
// ProduceFuncAsync produce, MemorySpanResizer<byte> resizer = default)
|
||||
//C# TO JAVA CONVERTER NOTE: Java does not support optional parameters. Overloaded method(s) are created above:
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
public static Task<Result> WriteRecordIOAsync(OutputStream stm, Segment segment, ProduceFuncAsync produce,
|
||||
MemorySpanResizer<Byte> resizer) {
|
||||
// Create a reusable, resizable buffer if the caller didn't provide one.
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: resizer = resizer != null ? resizer : new MemorySpanResizer<byte>();
|
||||
resizer = resizer != null ? resizer : new MemorySpanResizer<Byte>();
|
||||
|
||||
// Write a RecordIO stream.
|
||||
Memory<Byte> metadata;
|
||||
Out<Memory<Byte>> tempOut_metadata = new Out<Memory<Byte>>();
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: Result r = RecordIOStream.FormatSegment(segment, resizer, out Memory<byte> metadata);
|
||||
Result r = RecordIOStream.FormatSegment(segment.clone(), resizer, tempOut_metadata);
|
||||
metadata = tempOut_metadata.get();
|
||||
if (r != Result.SUCCESS) {
|
||||
return r;
|
||||
}
|
||||
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent to 'await' in Java:
|
||||
await stm.WriteAsync(metadata);
|
||||
|
||||
long index = 0;
|
||||
while (true) {
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: ReadOnlyMemory<byte> body;
|
||||
ReadOnlyMemory<Byte> body;
|
||||
// TODO: C# TO JAVA CONVERTER: Java has no equivalent to the C# deconstruction assignments:
|
||||
(r, body) =await produce (index++);
|
||||
if (r != Result.SUCCESS) {
|
||||
return r;
|
||||
}
|
||||
|
||||
if (body.IsEmpty) {
|
||||
break;
|
||||
}
|
||||
|
||||
Out<Memory<Byte>> tempOut_metadata2 = new Out<Memory<Byte>>();
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: r = RecordIOStream.FormatRow(body, resizer, out metadata);
|
||||
r = RecordIOStream.FormatRow(body, resizer, tempOut_metadata2);
|
||||
metadata = tempOut_metadata2.get();
|
||||
if (r != Result.SUCCESS) {
|
||||
return r;
|
||||
}
|
||||
|
||||
// Metadata and Body memory blocks should not overlap since they are both in
|
||||
// play at the same time. If they do this usually means that the same resizer
|
||||
// was incorrectly used for both. Check the resizer parameter passed to
|
||||
// WriteRecordIOAsync for metadata.
|
||||
checkState(!metadata.Span.Overlaps(body.Span));
|
||||
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent to 'await' in Java:
|
||||
await stm.WriteAsync(metadata);
|
||||
// TODO: C# TO JAVA CONVERTER: There is no equivalent to 'await' in Java:
|
||||
await stm.WriteAsync(body);
|
||||
}
|
||||
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute and format a record header for the given record body.
|
||||
*
|
||||
* @param body The body whose record header should be formatted.
|
||||
* @param resizer The resizer to use in allocating a buffer for the record header.
|
||||
* @param block The byte sequence of the written row buffer.
|
||||
* @return Success if the write completes without error, the error code otherwise.
|
||||
*/
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: private static Result FormatRow(ReadOnlyMemory<byte> body, MemorySpanResizer<byte> resizer, out Memory<byte> block)
|
||||
private static Result FormatRow(ReadOnlyMemory<Byte> body, MemorySpanResizer<Byte> resizer, Out<Memory<Byte>> block) {
|
||||
RowBuffer row;
|
||||
Out<RowBuffer> tempOut_row = new Out<RowBuffer>();
|
||||
Result r = RecordIOFormatter.FormatRecord(body, tempOut_row, resizer);
|
||||
row = tempOut_row.get();
|
||||
if (r != Result.SUCCESS) {
|
||||
block.setAndGet(null);
|
||||
return r;
|
||||
}
|
||||
|
||||
block.setAndGet(resizer.getMemory().Slice(0, row.Length));
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a segment.
|
||||
*
|
||||
* @param segment The segment to format.
|
||||
* @param resizer The resizer to use in allocating a buffer for the segment.
|
||||
* @param block The byte sequence of the written row buffer.
|
||||
* @return Success if the write completes without error, the error code otherwise.
|
||||
*/
|
||||
//C# TO JAVA CONVERTER WARNING: Unsigned integer types have no direct equivalent in Java:
|
||||
//ORIGINAL LINE: private static Result FormatSegment(Segment segment, MemorySpanResizer<byte> resizer, out
|
||||
// Memory<byte> block)
|
||||
private static Result FormatSegment(Segment segment, MemorySpanResizer<Byte> resizer,
|
||||
Out<Memory<Byte>> block) {
|
||||
RowBuffer row;
|
||||
Out<RowBuffer> tempOut_row =
|
||||
new Out<RowBuffer>();
|
||||
Result r = RecordIOFormatter.FormatSegment(segment.clone(), tempOut_row, resizer);
|
||||
row = tempOut_row.get();
|
||||
if (r != Result.SUCCESS) {
|
||||
block.setAndGet(null);
|
||||
return r;
|
||||
}
|
||||
|
||||
block.setAndGet(resizer.getMemory().Slice(0, row.Length));
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* A function that produces RecordIO record bodies.
|
||||
* <p>
|
||||
* Record bodies are returned as memory blocks. It is expected that each block is a
|
||||
* HybridRow, but any binary data is allowed.
|
||||
*
|
||||
* @param index The 0-based index of the record within the segment to be produced.
|
||||
* @param buffer The byte sequence of the record body's row buffer.
|
||||
* @return Success if the body was produced without error, the error code otherwise.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ProduceFunc {
|
||||
Result invoke(long index, Out<ReadOnlyMemory<Byte>> buffer);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.core.UtfAnyString;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.RowReader;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.RowWriter;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.TypeArgument;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
public final class RecordSerializer {
|
||||
|
||||
@Nonnull
|
||||
public static Result read(RowReader reader, Out<Record> record) {
|
||||
|
||||
Out<Integer> value = new Out<>();
|
||||
record.set(Record.empty());
|
||||
|
||||
while (reader.read()) {
|
||||
|
||||
String path = reader.path().toUtf16();
|
||||
checkState(path != null);
|
||||
Result result;
|
||||
|
||||
// TODO: use Path tokens here
|
||||
|
||||
switch (path) {
|
||||
|
||||
case "length":
|
||||
|
||||
result = reader.readInt32(value);
|
||||
record.get().length(value.get());
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
break;
|
||||
|
||||
case "crc32":
|
||||
|
||||
result = reader.readInt32(value);
|
||||
record.get().crc32(value.get());
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public static Result write(RowWriter writer, TypeArgument typeArg, Record record) {
|
||||
Result result = writer.writeInt32(new UtfAnyString("length"), record.length());
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
return writer.writeUInt32(new UtfAnyString("crc32"), record.crc32());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,123 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License.
|
||||
|
||||
package com.azure.data.cosmos.serialization.hybridrow.recordio;
|
||||
|
||||
import com.azure.data.cosmos.core.Out;
|
||||
import com.azure.data.cosmos.core.Utf8String;
|
||||
import com.azure.data.cosmos.core.UtfAnyString;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.HybridRowVersion;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.Result;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.RowBuffer;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.RowReader;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.RowWriter;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.io.Segment;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.LayoutResolver;
|
||||
import com.azure.data.cosmos.serialization.hybridrow.layouts.TypeArgument;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
public final class SegmentSerializer {
|
||||
|
||||
private static final UtfAnyString COMMENT = new UtfAnyString("comment");
|
||||
private static final UtfAnyString LENGTH = new UtfAnyString("length");
|
||||
private static final UtfAnyString SDL = new UtfAnyString("sdl");
|
||||
|
||||
public static Result read(ByteBuf buffer, LayoutResolver resolver, Out<Segment> segment) {
|
||||
RowReader reader = new RowReader(new RowBuffer(buffer, HybridRowVersion.V1, resolver));
|
||||
return SegmentSerializer.read(reader, segment);
|
||||
}
|
||||
|
||||
public static Result read(RowReader reader, Out<Segment> segment) {
|
||||
|
||||
segment.set(new Segment(null, null));
|
||||
|
||||
final Out<String> comment = new Out<>();
|
||||
final Out<Integer> length = new Out<>();
|
||||
final Out<String> sdl = new Out<>();
|
||||
|
||||
while (reader.read()) {
|
||||
|
||||
// TODO: Use Path tokens here.
|
||||
|
||||
switch (Objects.requireNonNull(reader.path().toUtf16())) {
|
||||
|
||||
case "length": {
|
||||
|
||||
Result result = reader.readInt32(length);
|
||||
segment.get().length(length.get());
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
if (reader.length() < segment.get().length()) {
|
||||
// RowBuffer isn't big enough to contain the rest of the header so just return the length
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case "comment": {
|
||||
|
||||
Result result = reader.readString(comment);
|
||||
segment.get().comment(comment.get());
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case "sdl": {
|
||||
|
||||
Result result = reader.readString(sdl);
|
||||
segment.get().sdl(sdl.get());
|
||||
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
|
||||
public static Result write(RowWriter writer, TypeArgument typeArg, Segment segment) {
|
||||
|
||||
Result result;
|
||||
|
||||
if (segment.comment() != null) {
|
||||
result = writer.writeString(COMMENT, segment.comment());
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
if (segment.sdl() != null) {
|
||||
result = writer.writeString(SDL, segment.sdl());
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// Defer writing the length until all other fields of the segment header are written.
|
||||
// The length is then computed based on the current size of the underlying RowBuffer.
|
||||
// Because the length field is itself fixed, writing the length can never change the length.
|
||||
|
||||
int length = writer.length();
|
||||
result = writer.writeInt32(LENGTH, length);
|
||||
if (result != Result.SUCCESS) {
|
||||
return result;
|
||||
}
|
||||
|
||||
checkState(length == writer.length());
|
||||
return Result.SUCCESS;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user