⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues;
import io.github.jbellis.jvector.graph.NodesIterator;
import io.github.jbellis.jvector.graph.RandomAccessVectorValues;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter;
import io.github.jbellis.jvector.graph.disk.OrdinalMapper;
import io.github.jbellis.jvector.graph.disk.*;
import io.github.jbellis.jvector.graph.disk.feature.Feature;
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
import io.github.jbellis.jvector.graph.disk.feature.FusedPQ;
Expand Down Expand Up @@ -189,12 +187,17 @@ public void writeSequentialThenParallelAndVerify(Blackhole blackhole) throws IOE
private void writeGraph(ImmutableGraphIndex graph,
Path path,
boolean parallel) throws IOException {
try (var writer = new OnDiskGraphIndexWriter.Builder(graph, path)
.withParallelWrites(parallel)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build()) {
try (RandomAccessOnDiskGraphIndexWriter writer = parallel ?
new OnDiskParallelGraphIndexWriter.Builder(graph, path)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build() :
new OnDiskGraphIndexWriter.Builder(graph, path)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build()) {
var view = graph.getView();
Map<FeatureId, IntFunction<Feature.State>> writeSuppliers = new EnumMap<>(FeatureId.class);
writeSuppliers.put(FeatureId.NVQ_VECTORS, inlineSuppliers.get(FeatureId.NVQ_VECTORS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
* <p>
* Implementations support different strategies for writing graph data,
* including random access, sequential, and parallel writing modes.
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
* or {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
* factory methods to obtain appropriate builder instances.
*
* @see GraphIndexWriterTypes
Expand Down Expand Up @@ -71,43 +70,23 @@ public interface GraphIndexWriter extends Closeable {
* @return a builder for the specified writer type
* @throws IllegalArgumentException if the type requires a specific writer type that wasn't provided
*/
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends RandomAccessWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
switch (type) {
case ON_DISK_PARALLEL:
if (!(out instanceof RandomAccessWriter)) {
throw new IllegalArgumentException("ON_DISK_PARALLEL requires a RandomAccessWriter");
}
return new OnDiskGraphIndexWriter.Builder(graphIndex, (RandomAccessWriter) out);
case ON_DISK_SEQUENTIAL:
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
case RANDOM_ACCESS:
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
case RANDOM_ACCESS_PARALLEL:
return new OnDiskParallelGraphIndexWriter.Builder(graphIndex, out);
default:
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
throw new IllegalArgumentException("Unknown RandomAccess GraphIndexWriterType: " + type);
}
}

/**
* Factory method to obtain a builder for the specified writer type with a file Path.
* <p>
* This overload accepts a Path and is required for:
* <ul>
* <li>ON_DISK_PARALLEL - enables async I/O for improved throughput</li>
* </ul>
* Other writer types should use the {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
* overload instead.
*
* @param type the type of writer to create (currently only ON_DISK_PARALLEL is supported)
* @param graphIndex the graph index to write
* @param out the output file path
* @return a builder for the specified writer type
* @throws FileNotFoundException if the file cannot be created or opened
* @throws IllegalArgumentException if the type is not supported via this method
*/
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
switch (type) {
case ON_DISK_PARALLEL:
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
case ON_DISK_SEQUENTIAL:
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
default:
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ public enum GraphIndexWriterTypes {
*/
ON_DISK_SEQUENTIAL,

/**
* Sequential on-disk writer that uses asynchronous I/O for improved throughput.
* Writes all data sequentially and is the current default implementation.
* Writes header as footer. Does not support incremental updates.
* Accepts any RandomAccessWriter.
*/
RANDOM_ACCESS,

/**
* Parallel on-disk writer that uses asynchronous I/O for improved throughput.
* Builds records in parallel across multiple threads and writes them
* asynchronously using AsynchronousFileChannel.
* Requires a Path to be provided for async file channel access.
*/
ON_DISK_PARALLEL
RANDOM_ACCESS_PARALLEL
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class NodeRecordTask implements Callable<List<NodeRecordTask.Result>> {
private final int recordSize;
private final long baseOffset; // Base file offset for L0 (offsets calculated per-ordinal)
private final ByteBuffer buffer;
private final boolean featuresPreWritten;

/**
* Result of building a node record.
Expand All @@ -71,7 +72,8 @@ static class Result {
Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers,
int recordSize,
long baseOffset,
ByteBuffer buffer) {
ByteBuffer buffer,
boolean featuresPreWritten) {
this.startOrdinal = startOrdinal;
this.endOrdinal = endOrdinal;
this.ordinalMapper = ordinalMapper;
Expand All @@ -82,6 +84,7 @@ static class Result {
this.recordSize = recordSize;
this.baseOffset = baseOffset;
this.buffer = buffer;
this.featuresPreWritten = featuresPreWritten;
}

@Override
Expand All @@ -91,89 +94,164 @@ public List<Result> call() throws Exception {
// Reuse writer and buffer across all ordinals in this range
var writer = new ByteBufferIndexWriter(buffer);

// Calculate feature size for offset adjustments when features are pre-written
int featureSize = inlineFeatures.stream().mapToInt(Feature::featureSize).sum();

for (int newOrdinal = startOrdinal; newOrdinal < endOrdinal; newOrdinal++) {
// Calculate file offset for this ordinal
long fileOffset = baseOffset + (long) newOrdinal * recordSize;
var originalOrdinal = ordinalMapper.newToOld(newOrdinal);

// Reset buffer for this ordinal
writer.reset();

var originalOrdinal = ordinalMapper.newToOld(newOrdinal);
if (featuresPreWritten) {
// Features already written via writeInline() - write ordinal and neighbors only
// Note: writeInline() does NOT write the ordinal, only features

long fileOffset = baseOffset + (long) newOrdinal * recordSize;

// Write node ordinal
writer.writeInt(newOrdinal);
// Write node ordinal
writer.writeInt(newOrdinal);

// Handle OMITTED nodes (holes in ordinal space)
if (originalOrdinal == OrdinalMapper.OMITTED) {
// Write placeholder: skip inline features and write empty neighbor list
for (var feature : inlineFeatures) {
// Write zeros for missing features
for (int i = 0; i < feature.featureSize(); i++) {
writer.writeByte(0);
// Calculate offset to neighbors section (after ordinal + features)
long neighborsOffset = fileOffset + Integer.BYTES + featureSize;

// Handle OMITTED nodes
if (originalOrdinal == OrdinalMapper.OMITTED) {
writer.writeInt(0); // neighbor count
for (int n = 0; n < graph.getDegree(0); n++) {
writer.writeInt(-1); // padding
}
} else {
// Validate node exists
if (!graph.containsNode(originalOrdinal)) {
throw new IllegalStateException(
String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s",
newOrdinal, originalOrdinal));
}

// Write neighbors only
var neighbors = view.getNeighborsIterator(0, originalOrdinal);
if (neighbors.size() > graph.getDegree(0)) {
throw new IllegalStateException(
String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!",
originalOrdinal, neighbors.size(), graph.getDegree(0)));
}

writer.writeInt(neighbors.size());
int n = 0;
for (; n < neighbors.size(); n++) {
var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt());
if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) {
throw new IllegalStateException(
String.format("Neighbor ordinal out of bounds: %d/%d",
newNeighborOrdinal, ordinalMapper.maxOrdinal()));
}
writer.writeInt(newNeighborOrdinal);
}

// Pad to max degree
for (; n < graph.getDegree(0); n++) {
writer.writeInt(-1);
}
}
writer.writeInt(0); // neighbor count
for (int n = 0; n < graph.getDegree(0); n++) {
writer.writeInt(-1); // padding
}

// We need to write two separate chunks:
// 1. Ordinal at the start of the record
// 2. Neighbors after the features
// For now, we'll create two results to handle this
ByteBuffer ordinalBuffer = ByteBuffer.allocate(Integer.BYTES);
ordinalBuffer.order(java.nio.ByteOrder.BIG_ENDIAN);
ordinalBuffer.putInt(newOrdinal);
ordinalBuffer.flip();
results.add(new Result(newOrdinal, fileOffset, ordinalBuffer));

// Clone the neighbors data (everything after the ordinal in the buffer)
ByteBuffer neighborsData = writer.cloneBuffer();
neighborsData.position(Integer.BYTES); // Skip the ordinal we already wrote
ByteBuffer neighborsCopy = ByteBuffer.allocate(neighborsData.remaining());
neighborsCopy.put(neighborsData);
neighborsCopy.flip();
results.add(new Result(newOrdinal, neighborsOffset, neighborsCopy));

} else {
// Validate node exists
if (!graph.containsNode(originalOrdinal)) {
throw new IllegalStateException(
String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s",
newOrdinal, originalOrdinal));
}
// Features not pre-written - write complete record
long fileOffset = baseOffset + (long) newOrdinal * recordSize;

// Write node ordinal
writer.writeInt(newOrdinal);

// Write inline features
for (var feature : inlineFeatures) {
var supplier = featureStateSuppliers.get(feature.id());
if (supplier == null) {
// Write zeros for missing supplier
// Handle OMITTED nodes (holes in ordinal space)
if (originalOrdinal == OrdinalMapper.OMITTED) {
// Write placeholder: skip inline features and write empty neighbor list
for (var feature : inlineFeatures) {
// Write zeros for missing features
for (int i = 0; i < feature.featureSize(); i++) {
writer.writeByte(0);
}
} else {
feature.writeInline(writer, supplier.apply(originalOrdinal));
}
}
writer.writeInt(0); // neighbor count
for (int n = 0; n < graph.getDegree(0); n++) {
writer.writeInt(-1); // padding
}
} else {
// Validate node exists
if (!graph.containsNode(originalOrdinal)) {
throw new IllegalStateException(
String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s",
newOrdinal, originalOrdinal));
}

// Write neighbors
var neighbors = view.getNeighborsIterator(0, originalOrdinal);
if (neighbors.size() > graph.getDegree(0)) {
throw new IllegalStateException(
String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!",
originalOrdinal, neighbors.size(), graph.getDegree(0)));
}
// Write inline features
for (var feature : inlineFeatures) {
var supplier = featureStateSuppliers.get(feature.id());
if (supplier == null) {
// Write zeros for missing supplier
for (int i = 0; i < feature.featureSize(); i++) {
writer.writeByte(0);
}
} else {
feature.writeInline(writer, supplier.apply(originalOrdinal));
}
}

writer.writeInt(neighbors.size());
int n = 0;
for (; n < neighbors.size(); n++) {
var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt());
if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) {
// Write neighbors
var neighbors = view.getNeighborsIterator(0, originalOrdinal);
if (neighbors.size() > graph.getDegree(0)) {
throw new IllegalStateException(
String.format("Neighbor ordinal out of bounds: %d/%d",
newNeighborOrdinal, ordinalMapper.maxOrdinal()));
String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!",
originalOrdinal, neighbors.size(), graph.getDegree(0)));
}

writer.writeInt(neighbors.size());
int n = 0;
for (; n < neighbors.size(); n++) {
var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt());
if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) {
throw new IllegalStateException(
String.format("Neighbor ordinal out of bounds: %d/%d",
newNeighborOrdinal, ordinalMapper.maxOrdinal()));
}
writer.writeInt(newNeighborOrdinal);
}

// Pad to max degree
for (; n < graph.getDegree(0); n++) {
writer.writeInt(-1);
}
writer.writeInt(newNeighborOrdinal);
}

// Pad to max degree
for (; n < graph.getDegree(0); n++) {
writer.writeInt(-1);
// Verify we wrote exactly the expected amount
if (writer.bytesWritten() != recordSize) {
throw new IllegalStateException(
String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes",
newOrdinal, recordSize, writer.bytesWritten()));
}
}

// Verify we wrote exactly the expected amount
if (writer.bytesWritten() != recordSize) {
throw new IllegalStateException(
String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes",
newOrdinal, recordSize, writer.bytesWritten()));
// Writer handles flip, copy, and reset internally
// The copy ensures thread-local buffer can be safely reused for the next ordinal
ByteBuffer dataCopy = writer.cloneBuffer();
results.add(new Result(newOrdinal, fileOffset, dataCopy));
}

// Writer handles flip, copy, and reset internally
// The copy ensures thread-local buffer can be safely reused for the next ordinal
ByteBuffer dataCopy = writer.cloneBuffer();
results.add(new Result(newOrdinal, fileOffset, dataCopy));
}

return results;
Expand Down
Loading