diff --git a/benchmarks-jmh/src/main/java/io/github/jbellis/jvector/bench/ParallelWriteBenchmark.java b/benchmarks-jmh/src/main/java/io/github/jbellis/jvector/bench/ParallelWriteBenchmark.java index 130f57d3e..592cea6df 100644 --- a/benchmarks-jmh/src/main/java/io/github/jbellis/jvector/bench/ParallelWriteBenchmark.java +++ b/benchmarks-jmh/src/main/java/io/github/jbellis/jvector/bench/ParallelWriteBenchmark.java @@ -21,9 +21,7 @@ import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues; import io.github.jbellis.jvector.graph.NodesIterator; import io.github.jbellis.jvector.graph.RandomAccessVectorValues; -import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; -import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter; -import io.github.jbellis.jvector.graph.disk.OrdinalMapper; +import io.github.jbellis.jvector.graph.disk.*; import io.github.jbellis.jvector.graph.disk.feature.Feature; import io.github.jbellis.jvector.graph.disk.feature.FeatureId; import io.github.jbellis.jvector.graph.disk.feature.FusedPQ; @@ -189,12 +187,17 @@ public void writeSequentialThenParallelAndVerify(Blackhole blackhole) throws IOE private void writeGraph(ImmutableGraphIndex graph, Path path, boolean parallel) throws IOException { - try (var writer = new OnDiskGraphIndexWriter.Builder(graph, path) - .withParallelWrites(parallel) - .with(nvqFeature) - .with(fusedPQFeature) - .withMapper(identityMapper) - .build()) { + try (RandomAccessOnDiskGraphIndexWriter writer = parallel ? + new OnDiskParallelGraphIndexWriter.Builder(graph, path) + .with(nvqFeature) + .with(fusedPQFeature) + .withMapper(identityMapper) + .build() : + new OnDiskGraphIndexWriter.Builder(graph, path) + .with(nvqFeature) + .with(fusedPQFeature) + .withMapper(identityMapper) + .build()) { var view = graph.getView(); Map> writeSuppliers = new EnumMap<>(FeatureId.class); writeSuppliers.put(FeatureId.NVQ_VECTORS, inlineSuppliers.get(FeatureId.NVQ_VECTORS)); diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java index 2ca8a3e9c..1afa18cfd 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java @@ -34,8 +34,7 @@ *

* Implementations support different strategies for writing graph data, * including random access, sequential, and parallel writing modes. - * Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)} - * or {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)} + * Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)} * factory methods to obtain appropriate builder instances. * * @see GraphIndexWriterTypes @@ -71,43 +70,23 @@ public interface GraphIndexWriter extends Closeable { * @return a builder for the specified writer type * @throws IllegalArgumentException if the type requires a specific writer type that wasn't provided */ - static AbstractGraphIndexWriter.Builder, ? extends IndexWriter> - getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) { + static AbstractGraphIndexWriter.Builder, ? extends RandomAccessWriter> + getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException { switch (type) { - case ON_DISK_PARALLEL: - if (!(out instanceof RandomAccessWriter)) { - throw new IllegalArgumentException("ON_DISK_PARALLEL requires a RandomAccessWriter"); - } - return new OnDiskGraphIndexWriter.Builder(graphIndex, (RandomAccessWriter) out); - case ON_DISK_SEQUENTIAL: - return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out); + case RANDOM_ACCESS: + return new OnDiskGraphIndexWriter.Builder(graphIndex, out); + case RANDOM_ACCESS_PARALLEL: + return new OnDiskParallelGraphIndexWriter.Builder(graphIndex, out); default: - throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type); + throw new IllegalArgumentException("Unknown RandomAccess GraphIndexWriterType: " + type); } } - /** - * Factory method to obtain a builder for the specified writer type with a file Path. - *

- * This overload accepts a Path and is required for: - *

- * Other writer types should use the {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)} - * overload instead. - * - * @param type the type of writer to create (currently only ON_DISK_PARALLEL is supported) - * @param graphIndex the graph index to write - * @param out the output file path - * @return a builder for the specified writer type - * @throws FileNotFoundException if the file cannot be created or opened - * @throws IllegalArgumentException if the type is not supported via this method - */ static AbstractGraphIndexWriter.Builder, ? extends IndexWriter> - getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException { + getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) { switch (type) { - case ON_DISK_PARALLEL: - return new OnDiskGraphIndexWriter.Builder(graphIndex, out); + case ON_DISK_SEQUENTIAL: + return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out); default: throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type); } diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java index b6c0b8816..7f46f8627 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java @@ -32,11 +32,19 @@ public enum GraphIndexWriterTypes { */ ON_DISK_SEQUENTIAL, + /** + * Sequential on-disk writer that uses asynchronous I/O for improved throughput. + * Writes all data sequentially and is the current default implementation. + * Writes header as footer. Does not support incremental updates. + * Accepts any RandomAccessWriter. + */ + RANDOM_ACCESS, + /** * Parallel on-disk writer that uses asynchronous I/O for improved throughput. * Builds records in parallel across multiple threads and writes them * asynchronously using AsynchronousFileChannel. * Requires a Path to be provided for async file channel access. */ - ON_DISK_PARALLEL + RANDOM_ACCESS_PARALLEL } diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/NodeRecordTask.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/NodeRecordTask.java index b3089f10a..a41e884ff 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/NodeRecordTask.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/NodeRecordTask.java @@ -46,6 +46,7 @@ class NodeRecordTask implements Callable> { private final int recordSize; private final long baseOffset; // Base file offset for L0 (offsets calculated per-ordinal) private final ByteBuffer buffer; + private final boolean featuresPreWritten; /** * Result of building a node record. @@ -71,7 +72,8 @@ static class Result { Map> featureStateSuppliers, int recordSize, long baseOffset, - ByteBuffer buffer) { + ByteBuffer buffer, + boolean featuresPreWritten) { this.startOrdinal = startOrdinal; this.endOrdinal = endOrdinal; this.ordinalMapper = ordinalMapper; @@ -82,6 +84,7 @@ static class Result { this.recordSize = recordSize; this.baseOffset = baseOffset; this.buffer = buffer; + this.featuresPreWritten = featuresPreWritten; } @Override @@ -91,89 +94,164 @@ public List call() throws Exception { // Reuse writer and buffer across all ordinals in this range var writer = new ByteBufferIndexWriter(buffer); + // Calculate feature size for offset adjustments when features are pre-written + int featureSize = inlineFeatures.stream().mapToInt(Feature::featureSize).sum(); + for (int newOrdinal = startOrdinal; newOrdinal < endOrdinal; newOrdinal++) { - // Calculate file offset for this ordinal - long fileOffset = baseOffset + (long) newOrdinal * recordSize; + var originalOrdinal = ordinalMapper.newToOld(newOrdinal); // Reset buffer for this ordinal writer.reset(); - var originalOrdinal = ordinalMapper.newToOld(newOrdinal); + if (featuresPreWritten) { + // Features already written via writeInline() - write ordinal and neighbors only + // Note: writeInline() does NOT write the ordinal, only features + + long fileOffset = baseOffset + (long) newOrdinal * recordSize; - // Write node ordinal - writer.writeInt(newOrdinal); + // Write node ordinal + writer.writeInt(newOrdinal); - // Handle OMITTED nodes (holes in ordinal space) - if (originalOrdinal == OrdinalMapper.OMITTED) { - // Write placeholder: skip inline features and write empty neighbor list - for (var feature : inlineFeatures) { - // Write zeros for missing features - for (int i = 0; i < feature.featureSize(); i++) { - writer.writeByte(0); + // Calculate offset to neighbors section (after ordinal + features) + long neighborsOffset = fileOffset + Integer.BYTES + featureSize; + + // Handle OMITTED nodes + if (originalOrdinal == OrdinalMapper.OMITTED) { + writer.writeInt(0); // neighbor count + for (int n = 0; n < graph.getDegree(0); n++) { + writer.writeInt(-1); // padding + } + } else { + // Validate node exists + if (!graph.containsNode(originalOrdinal)) { + throw new IllegalStateException( + String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s", + newOrdinal, originalOrdinal)); + } + + // Write neighbors only + var neighbors = view.getNeighborsIterator(0, originalOrdinal); + if (neighbors.size() > graph.getDegree(0)) { + throw new IllegalStateException( + String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!", + originalOrdinal, neighbors.size(), graph.getDegree(0))); + } + + writer.writeInt(neighbors.size()); + int n = 0; + for (; n < neighbors.size(); n++) { + var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt()); + if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) { + throw new IllegalStateException( + String.format("Neighbor ordinal out of bounds: %d/%d", + newNeighborOrdinal, ordinalMapper.maxOrdinal())); + } + writer.writeInt(newNeighborOrdinal); + } + + // Pad to max degree + for (; n < graph.getDegree(0); n++) { + writer.writeInt(-1); } } - writer.writeInt(0); // neighbor count - for (int n = 0; n < graph.getDegree(0); n++) { - writer.writeInt(-1); // padding - } + + // We need to write two separate chunks: + // 1. Ordinal at the start of the record + // 2. Neighbors after the features + // For now, we'll create two results to handle this + ByteBuffer ordinalBuffer = ByteBuffer.allocate(Integer.BYTES); + ordinalBuffer.order(java.nio.ByteOrder.BIG_ENDIAN); + ordinalBuffer.putInt(newOrdinal); + ordinalBuffer.flip(); + results.add(new Result(newOrdinal, fileOffset, ordinalBuffer)); + + // Clone the neighbors data (everything after the ordinal in the buffer) + ByteBuffer neighborsData = writer.cloneBuffer(); + neighborsData.position(Integer.BYTES); // Skip the ordinal we already wrote + ByteBuffer neighborsCopy = ByteBuffer.allocate(neighborsData.remaining()); + neighborsCopy.put(neighborsData); + neighborsCopy.flip(); + results.add(new Result(newOrdinal, neighborsOffset, neighborsCopy)); + } else { - // Validate node exists - if (!graph.containsNode(originalOrdinal)) { - throw new IllegalStateException( - String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s", - newOrdinal, originalOrdinal)); - } + // Features not pre-written - write complete record + long fileOffset = baseOffset + (long) newOrdinal * recordSize; + + // Write node ordinal + writer.writeInt(newOrdinal); - // Write inline features - for (var feature : inlineFeatures) { - var supplier = featureStateSuppliers.get(feature.id()); - if (supplier == null) { - // Write zeros for missing supplier + // Handle OMITTED nodes (holes in ordinal space) + if (originalOrdinal == OrdinalMapper.OMITTED) { + // Write placeholder: skip inline features and write empty neighbor list + for (var feature : inlineFeatures) { + // Write zeros for missing features for (int i = 0; i < feature.featureSize(); i++) { writer.writeByte(0); } - } else { - feature.writeInline(writer, supplier.apply(originalOrdinal)); } - } + writer.writeInt(0); // neighbor count + for (int n = 0; n < graph.getDegree(0); n++) { + writer.writeInt(-1); // padding + } + } else { + // Validate node exists + if (!graph.containsNode(originalOrdinal)) { + throw new IllegalStateException( + String.format("Ordinal mapper mapped new ordinal %s to non-existing node %s", + newOrdinal, originalOrdinal)); + } - // Write neighbors - var neighbors = view.getNeighborsIterator(0, originalOrdinal); - if (neighbors.size() > graph.getDegree(0)) { - throw new IllegalStateException( - String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!", - originalOrdinal, neighbors.size(), graph.getDegree(0))); - } + // Write inline features + for (var feature : inlineFeatures) { + var supplier = featureStateSuppliers.get(feature.id()); + if (supplier == null) { + // Write zeros for missing supplier + for (int i = 0; i < feature.featureSize(); i++) { + writer.writeByte(0); + } + } else { + feature.writeInline(writer, supplier.apply(originalOrdinal)); + } + } - writer.writeInt(neighbors.size()); - int n = 0; - for (; n < neighbors.size(); n++) { - var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt()); - if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) { + // Write neighbors + var neighbors = view.getNeighborsIterator(0, originalOrdinal); + if (neighbors.size() > graph.getDegree(0)) { throw new IllegalStateException( - String.format("Neighbor ordinal out of bounds: %d/%d", - newNeighborOrdinal, ordinalMapper.maxOrdinal())); + String.format("Node %d has more neighbors %d than the graph's max degree %d -- run Builder.cleanup()!", + originalOrdinal, neighbors.size(), graph.getDegree(0))); + } + + writer.writeInt(neighbors.size()); + int n = 0; + for (; n < neighbors.size(); n++) { + var newNeighborOrdinal = ordinalMapper.oldToNew(neighbors.nextInt()); + if (newNeighborOrdinal < 0 || newNeighborOrdinal > ordinalMapper.maxOrdinal()) { + throw new IllegalStateException( + String.format("Neighbor ordinal out of bounds: %d/%d", + newNeighborOrdinal, ordinalMapper.maxOrdinal())); + } + writer.writeInt(newNeighborOrdinal); + } + + // Pad to max degree + for (; n < graph.getDegree(0); n++) { + writer.writeInt(-1); } - writer.writeInt(newNeighborOrdinal); } - // Pad to max degree - for (; n < graph.getDegree(0); n++) { - writer.writeInt(-1); + // Verify we wrote exactly the expected amount + if (writer.bytesWritten() != recordSize) { + throw new IllegalStateException( + String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes", + newOrdinal, recordSize, writer.bytesWritten())); } - } - // Verify we wrote exactly the expected amount - if (writer.bytesWritten() != recordSize) { - throw new IllegalStateException( - String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes", - newOrdinal, recordSize, writer.bytesWritten())); + // Writer handles flip, copy, and reset internally + // The copy ensures thread-local buffer can be safely reused for the next ordinal + ByteBuffer dataCopy = writer.cloneBuffer(); + results.add(new Result(newOrdinal, fileOffset, dataCopy)); } - - // Writer handles flip, copy, and reset internally - // The copy ensures thread-local buffer can be safely reused for the next ordinal - ByteBuffer dataCopy = writer.cloneBuffer(); - results.add(new Result(newOrdinal, fileOffset, dataCopy)); } return results; diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java index 2f2b48f5b..c3feda6c4 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java @@ -16,7 +16,6 @@ package io.github.jbellis.jvector.graph.disk; -import io.github.jbellis.jvector.annotations.Experimental; import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter; import io.github.jbellis.jvector.disk.RandomAccessWriter; import io.github.jbellis.jvector.graph.ImmutableGraphIndex; @@ -35,14 +34,14 @@ * Writes a graph index to disk in a format that can be loaded as an OnDiskGraphIndex. *

* The serialization process follows these steps: - * + *

* 1. File Layout: * - CommonHeader: Contains version, dimension, entry node, and layer information * - Header with Features: Contains feature-specific headers * - Layer 0 data: Contains node ordinals, inline features, and edges for all nodes * - Higher layer data (levels 1..N): Contains sparse node ordinals and edges * - Separated features: Contains feature data stored separately from nodes - * + *

* 2. Serialization Process: * - First, a placeholder header is written to reserve space * - For each node in layer 0: @@ -55,22 +54,17 @@ * - For each separated feature: * - Write feature data for all nodes sequentially * - Finally, rewrite the header with correct offsets - * + *

* 3. Ordinal Mapping: * - The writer uses an OrdinalMapper to map between original node IDs and * the sequential IDs used in the on-disk format * - This allows for compaction (removing "holes" from deleted nodes) * - It also enables custom ID mapping schemes for specific use cases - * + *

* The class supports incremental writing through the writeInline method, which * allows writing features for individual nodes without writing the entire graph. */ -public class OnDiskGraphIndexWriter extends AbstractGraphIndexWriter { - private final long startOffset; - private volatile boolean useParallelWrites = false; - private final Path filePath; // Required for parallel writes - private final int parallelWorkerThreads; - private final boolean parallelUseDirectBuffers; +public class OnDiskGraphIndexWriter extends RandomAccessOnDiskGraphIndexWriter { /** * Constructs an OnDiskGraphIndexWriter with all parameters including optional file path @@ -83,9 +77,6 @@ public class OnDiskGraphIndexWriter extends AbstractGraphIndexWriter features, - Path filePath, - int parallelWorkerThreads, - boolean parallelUseDirectBuffers) + EnumMap features) { - super(randomAccessWriter, version, graph, oldToNewOrdinals, dimension, features); - this.startOffset = startOffset; - this.filePath = filePath; - this.parallelWorkerThreads = parallelWorkerThreads; - this.parallelUseDirectBuffers = parallelUseDirectBuffers; + super(randomAccessWriter, version, startOffset, graph, oldToNewOrdinals, dimension, features); } /** - * Constructs an OnDiskGraphIndexWriter without a file path. - * Parallel writes will not be available without a file path. - * Uses default parallel write configuration. - * - * @param randomAccessWriter the writer to use for output - * @param version the format version to write - * @param startOffset the starting offset in the file - * @param graph the graph to write - * @param oldToNewOrdinals mapper for ordinal renumbering - * @param dimension the vector dimension - * @param features the features to include - */ - OnDiskGraphIndexWriter(RandomAccessWriter randomAccessWriter, - int version, - long startOffset, - ImmutableGraphIndex graph, - OrdinalMapper oldToNewOrdinals, - int dimension, - EnumMap features) - { - this(randomAccessWriter, version, startOffset, graph, oldToNewOrdinals, dimension, features, null, 0, false); - } - - /** - * Close the view and the output stream. Unlike the super method, for backwards compatibility reasons, - * this method assumes ownership of the output stream. + * Writes L0 records sequentially */ @Override - public synchronized void close() throws IOException { - out.close(); - } - - /** - * Caller should synchronize on this OnDiskGraphIndexWriter instance if mixing usage of the - * output with calls to any of the synchronized methods in this class. - *

- * Provided for callers (like Cassandra) that want to add their own header/footer to the output. - */ - public RandomAccessWriter getOutput() { - return out; - } - - /** - * Write the inline features of the given ordinal to the output at the correct offset. - * Nothing else is written (no headers, no edges). The output IS NOT flushed. - *

- * Note: the ordinal given is implicitly a "new" ordinal in the sense of the OrdinalMapper, - * but since no nodes or edges are involved (we just write the given State to the index file), - * the mapper is not invoked. - */ - public synchronized void writeInline(int ordinal, Map stateMap) throws IOException - { - for (var featureId : stateMap.keySet()) { - if (!featureMap.containsKey(featureId)) { - throw new IllegalArgumentException(String.format("Feature %s not configured for index", featureId)); - } - } - - out.seek(featureOffsetForOrdinal(ordinal)); - - for (var feature : inlineFeatures) { - var state = stateMap.get(feature.id()); - if (state == null) { - out.seek(out.position() + feature.featureSize()); - } else { - feature.writeInline(out, state); - } - } - - maxOrdinalWritten = Math.max(maxOrdinalWritten, ordinal); - } - - private long featureOffsetForOrdinal(int ordinal) { - return super.featureOffsetForOrdinal(startOffset, ordinal); - } - - public synchronized void write(Map> featureStateSuppliers) throws IOException - { - if (graph instanceof OnHeapGraphIndex) { - var ohgi = (OnHeapGraphIndex) graph; - if (ohgi.getDeletedNodes().cardinality() > 0) { - throw new IllegalArgumentException("Run builder.cleanup() before writing the graph"); - } - } - for (var featureId : featureStateSuppliers.keySet()) { - if (!featureMap.containsKey(featureId)) { - throw new IllegalArgumentException(String.format("Feature %s not configured for index", featureId)); - } - } - if (ordinalMapper.maxOrdinal() < graph.size(0) - 1) { - var msg = String.format("Ordinal mapper from [0..%d] does not cover all nodes in the graph of size %d", - ordinalMapper.maxOrdinal(), graph.size(0)); - throw new IllegalStateException(msg); - } - - var view = graph.getView(); - - writeHeader(view); // sets position to start writing features - - // Write L0 records (either parallel or sequential) - if (useParallelWrites) { - writeL0RecordsParallel(featureStateSuppliers); - } else { - writeL0RecordsSequential(view, featureStateSuppliers); - } - - // We will use the abstract method because no random access is needed - writeSparseLevels(view, featureStateSuppliers); - - // We will use the abstract method because no random access is needed - writeSeparatedFeatures(featureStateSuppliers); - - if (version >= 5) { - writeFooter(view, out.position()); - } - final var endOfGraphPosition = out.position(); - - // Write the header again with updated offsets - writeHeader(view); - out.seek(endOfGraphPosition); - out.flush(); - view.close(); - } - - /** - * Writes L0 records using parallel workers with asynchronous file I/O. - *

- * Records are written asynchronously using AsynchronousFileChannel for improved throughput - * while maintaining correct ordering. This method parallelizes record building across - * multiple threads and writes results in sequential order. - *

- * Requires filePath to have been provided during construction. - * - * @param featureStateSuppliers suppliers for feature state data - * @throws IOException if an I/O error occurs - */ - @Experimental - private void writeL0RecordsParallel(Map> featureStateSuppliers) throws IOException { - if (filePath == null) { - throw new IllegalStateException("Parallel writes require a file path. Use Builder(ImmutableGraphIndex, Path) constructor."); - } - - // Flush writer before async writes to ensure buffered data is on disk - // This is critical when using AsynchronousFileChannel in parallel with BufferedRandomAccessWriter - out.flush(); - long baseOffset = out.position(); - - var config = new ParallelGraphWriter.Config( - parallelWorkerThreads, - parallelUseDirectBuffers, - 4 // Default task multiplier (4x cores) - ); - - try (var parallelWriter = new ParallelGraphWriter( - out, - graph, - inlineFeatures, - config, - filePath)) { - - parallelWriter.writeL0Records( - ordinalMapper, - inlineFeatures, - featureStateSuppliers, - baseOffset - ); - - // Update maxOrdinalWritten - maxOrdinalWritten = ordinalMapper.maxOrdinal(); - - // Seek to end of L0 region - long endOffset = baseOffset + (long) (ordinalMapper.maxOrdinal() + 1) * parallelWriter.getRecordSize(); - out.seek(endOffset); - } - } - - /** - * Writes L0 records sequentially (original implementation). - */ - private void writeL0RecordsSequential(ImmutableGraphIndex.View view, + protected void writeL0Records(ImmutableGraphIndex.View view, Map> featureStateSuppliers) throws IOException { // for each graph node, write the associated features, followed by its neighbors at L0 for (int newOrdinal = 0; newOrdinal <= ordinalMapper.maxOrdinal(); newOrdinal++) { @@ -345,64 +153,14 @@ private void writeL0RecordsSequential(ImmutableGraphIndex.View view, } } - /** - * Write the index header and completed edge lists to the given output. - * Unlike the super method, this method flushes the output and also assumes it's using a RandomAccessWriter that can - * seek to the startOffset and re-write the header. - * @throws IOException if there is an error writing the header - */ - public synchronized void writeHeader(ImmutableGraphIndex.View view) throws IOException { - // graph-level properties - out.seek(startOffset); - super.writeHeader(view, startOffset); - out.flush(); - } - - /** CRC32 checksum of bytes written since the starting offset - * Note on parallel writes and footer handling: - * When parallel writes are enabled (via {@link #setParallelWrites(boolean)}), it is the caller's responsibility - * to ensure that all parallel write operations have fully completed before writing the footer (e.g., checksum). - * The footer must only be written after all data has been flushed and no further writes are in progress, - * to avoid data corruption or incomplete checksums. This class does not currently coordinate or synchronize - * footer writing with parallel operations. - */ - public synchronized long checksum() throws IOException { - long endOffset = out.position(); - return out.checksum(startOffset, endOffset); - } - - /** - * Enables parallel writes for L0 records. This can significantly improve throughput - * for large graphs by parallelizing record building across multiple cores. - *

- * @param enabled whether to enable parallel writes - */ - public void setParallelWrites(boolean enabled) { - this.useParallelWrites = enabled; - } - /** * Builder for {@link OnDiskGraphIndexWriter}, with optional features. */ public static class Builder extends AbstractGraphIndexWriter.Builder { private long startOffset = 0L; - private boolean useParallelWrites = false; - /** - * The current implementation of this Builder allows for a RandomAccessWriter to be passed to the constructor in - * order to allow the backing of any IndexWriter and not tying to a particular implementation. However in this - * case the class we are in is literally named "OnDiskGraphIndexWriter" and is built to store the graph index - * on a file on disk. As RandomAccessWriter does not allow for the extraction of the backing Path there is no way - * to use async i/o to write the file without modifying the way the OnDiskGraphIndexWriter.Builder is constructed. - * Hence the addition here of a Path variable. In the future it would be an optimization to deprecate the constructor - * that uses a RandomAccessWriter and only allow the one that takes a Path. For now this allows for backwards compatibility. - */ - private Path filePath = null; - private int parallelWorkerThreads = 0; - private boolean parallelUseDirectBuffers = false; public Builder(ImmutableGraphIndex graphIndex, Path outPath) throws FileNotFoundException { this(graphIndex, new BufferedRandomAccessWriter(outPath)); - this.filePath = outPath; } public Builder(ImmutableGraphIndex graphIndex, RandomAccessWriter out) { @@ -418,45 +176,9 @@ public Builder withStartOffset(long startOffset) { return this; } - /** - * Enable parallel writes for L0 records. Can significantly improve throughput for large graphs. - */ - public Builder withParallelWrites(boolean enabled) { - this.useParallelWrites = enabled; - return this; - } - - /** - * Set the number of worker threads for parallel writes. - * - * @param workerThreads number of worker threads (0 = use available processors) - * @return this builder - */ - public Builder withParallelWorkerThreads(int workerThreads) { - this.parallelWorkerThreads = workerThreads; - return this; - } - - /** - * Set whether to use direct ByteBuffers for parallel writes. - * Direct buffers can provide better performance for large records but use off-heap memory. - * - * @param useDirectBuffers whether to use direct ByteBuffers - * @return this builder - */ - public Builder withParallelDirectBuffers(boolean useDirectBuffers) { - this.parallelUseDirectBuffers = useDirectBuffers; - return this; - } - @Override protected OnDiskGraphIndexWriter reallyBuild(int dimension) { - var writer = new OnDiskGraphIndexWriter( - out, version, startOffset, graphIndex, ordinalMapper, dimension, features, filePath, - parallelWorkerThreads, parallelUseDirectBuffers - ); - writer.setParallelWrites(useParallelWrites); - return writer; + return new OnDiskGraphIndexWriter(out, version, startOffset, graphIndex, ordinalMapper, dimension, features); } } } \ No newline at end of file diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskParallelGraphIndexWriter.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskParallelGraphIndexWriter.java new file mode 100644 index 000000000..c601545c0 --- /dev/null +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskParallelGraphIndexWriter.java @@ -0,0 +1,225 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.jbellis.jvector.graph.disk; + +import io.github.jbellis.jvector.annotations.Experimental; +import io.github.jbellis.jvector.disk.BufferedRandomAccessWriter; +import io.github.jbellis.jvector.disk.RandomAccessWriter; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; +import io.github.jbellis.jvector.graph.OnHeapGraphIndex; +import io.github.jbellis.jvector.graph.disk.feature.Feature; +import io.github.jbellis.jvector.graph.disk.feature.FeatureId; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Path; +import java.util.EnumMap; +import java.util.Map; +import java.util.function.IntFunction; + +/** + * Writes a graph index to disk in a format that can be loaded as an OnDiskGraphIndex. + *

+ * The serialization process follows these steps: + *

+ * 1. File Layout: + * - CommonHeader: Contains version, dimension, entry node, and layer information + * - Header with Features: Contains feature-specific headers + * - Layer 0 data: Contains node ordinals, inline features, and edges for all nodes + * - Higher layer data (levels 1..N): Contains sparse node ordinals and edges + * - Separated features: Contains feature data stored separately from nodes + *

+ * 2. Serialization Process: + * - First, a placeholder header is written to reserve space + * - For each node in layer 0: + * - Write node ordinal + * - Write inline features (vectors, quantized data, etc.) + * - Write neighbor count and neighbor ordinals + * - For each higher layer (1..N): + * - Write only nodes that exist in that layer + * - For each node: write ordinal, neighbor count, and neighbor ordinals + * - For each separated feature: + * - Write feature data for all nodes sequentially + * - Finally, rewrite the header with correct offsets + *

+ * 3. Ordinal Mapping: + * - The writer uses an OrdinalMapper to map between original node IDs and + * the sequential IDs used in the on-disk format + * - This allows for compaction (removing "holes" from deleted nodes) + * - It also enables custom ID mapping schemes for specific use cases + *

+ * The class supports incremental writing through the writeInline method, which + * allows writing features for individual nodes without writing the entire graph. + */ +public class OnDiskParallelGraphIndexWriter extends RandomAccessOnDiskGraphIndexWriter { + private volatile boolean featuresPreWritten = false; + private final Path filePath; + private final int parallelWorkerThreads; + private final boolean parallelUseDirectBuffers; + + /** + * Constructs an OnDiskParallelGraphIndexWriter with all parameters including optional file path + * and parallel write configuration. + * + * @param randomAccessWriter the writer to use for output + * @param version the format version to write + * @param startOffset the starting offset in the file + * @param graph the graph to write + * @param oldToNewOrdinals mapper for ordinal renumbering + * @param dimension the vector dimension + * @param features the features to include + * @param filePath file path required for parallel writes (can be null for sequential writes) + * @param parallelWorkerThreads number of worker threads for parallel writes (0 = use available processors) + * @param parallelUseDirectBuffers whether to use direct ByteBuffers for parallel writes + */ + OnDiskParallelGraphIndexWriter(RandomAccessWriter randomAccessWriter, + int version, + long startOffset, + ImmutableGraphIndex graph, + OrdinalMapper oldToNewOrdinals, + int dimension, + EnumMap features, + Path filePath, + int parallelWorkerThreads, + boolean parallelUseDirectBuffers) + { + super(randomAccessWriter, version, startOffset, graph, oldToNewOrdinals, dimension, features); + this.filePath = filePath; + this.parallelWorkerThreads = parallelWorkerThreads; + this.parallelUseDirectBuffers = parallelUseDirectBuffers; + } + + /** + * Override to track when features have been pre-written via writeFeaturesInline. + */ + @Override + public synchronized void writeFeaturesInline(int ordinal, Map stateMap) throws IOException { + super.writeFeaturesInline(ordinal, stateMap); + featuresPreWritten = true; + } + + /** + * Writes L0 records using parallel workers with asynchronous file I/O. + *

+ * Records are written asynchronously using AsynchronousFileChannel for improved throughput + * while maintaining correct ordering. This method parallelizes record building across + * multiple threads and writes results in sequential order. + *

+ * Requires filePath to have been provided during construction. + * + * @param featureStateSuppliers suppliers for feature state data + * @throws IOException if an I/O error occurs + */ + @Experimental + @Override + protected void writeL0Records(ImmutableGraphIndex.View view, + Map> featureStateSuppliers) throws IOException { + if (filePath == null) { + throw new IllegalStateException("Parallel writes require a file path. Use Builder(ImmutableGraphIndex, Path) constructor."); + } + + // Flush writer before async writes to ensure buffered data is on disk + // This is critical when using AsynchronousFileChannel in parallel with BufferedRandomAccessWriter + out.flush(); + long baseOffset = out.position(); + + var config = new ParallelGraphWriter.Config( + parallelWorkerThreads, + parallelUseDirectBuffers, + 4 // Default task multiplier (4x cores) + ); + + try (var parallelWriter = new ParallelGraphWriter( + out, + graph, + inlineFeatures, + config, + filePath)) { + + parallelWriter.writeL0Records( + ordinalMapper, + inlineFeatures, + featureStateSuppliers, + baseOffset, + featuresPreWritten + ); + + // Update maxOrdinalWritten + maxOrdinalWritten = ordinalMapper.maxOrdinal(); + + // Seek to end of L0 region + long endOffset = baseOffset + (long) (ordinalMapper.maxOrdinal() + 1) * parallelWriter.getRecordSize(); + out.seek(endOffset); + } + } + + + /** + * Builder for {@link OnDiskParallelGraphIndexWriter}, with optional features. + */ + public static class Builder extends AbstractGraphIndexWriter.Builder { + private long startOffset = 0L; + private final Path filePath; + private int parallelWorkerThreads = 0; + private boolean parallelUseDirectBuffers = false; + + public Builder(ImmutableGraphIndex graphIndex, Path outPath) throws FileNotFoundException { + super(graphIndex, new BufferedRandomAccessWriter(outPath)); + this.filePath = outPath; + } + + /** + * Set the starting offset for the graph index in the output file. This is useful if you want to + * append the index to an existing file. + */ + public Builder withStartOffset(long startOffset) { + this.startOffset = startOffset; + return this; + } + + /** + * Set the number of worker threads for parallel writes. + * + * @param workerThreads number of worker threads (0 = use available processors) + * @return this builder + */ + public Builder withParallelWorkerThreads(int workerThreads) { + this.parallelWorkerThreads = workerThreads; + return this; + } + + /** + * Set whether to use direct ByteBuffers for parallel writes. + * Direct buffers can provide better performance for large records but use off-heap memory. + * + * @param useDirectBuffers whether to use direct ByteBuffers + * @return this builder + */ + public Builder withParallelDirectBuffers(boolean useDirectBuffers) { + this.parallelUseDirectBuffers = useDirectBuffers; + return this; + } + + @Override + protected OnDiskParallelGraphIndexWriter reallyBuild(int dimension) { + return new OnDiskParallelGraphIndexWriter( + out, version, startOffset, graphIndex, ordinalMapper, dimension, features, filePath, + parallelWorkerThreads, parallelUseDirectBuffers + ); + } + } +} \ No newline at end of file diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java index 1173b30a8..ded2a45cb 100644 --- a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java @@ -166,18 +166,20 @@ public ParallelGraphWriter(RandomAccessWriter writer, * @param inlineFeatures the inline features to write * @param featureStateSuppliers suppliers for feature state * @param baseOffset the file offset where L0 records start + * @param featuresPreWritten whether features have already been written via writeInline * @throws IOException if an IO error occurs */ public void writeL0Records(OrdinalMapper ordinalMapper, List inlineFeatures, Map> featureStateSuppliers, - long baseOffset) throws IOException { + long baseOffset, + boolean featuresPreWritten) throws IOException { int maxOrdinal = ordinalMapper.maxOrdinal(); int totalOrdinals = maxOrdinal + 1; // Calculate optimal number of tasks based on cores and task multiplier int numCores = Runtime.getRuntime().availableProcessors(); - int numTasks = Math.min((totalOrdinals / (numCores * taskMultiplier)), totalOrdinals); + int numTasks = Math.max(1, Math.min(numCores * taskMultiplier, totalOrdinals)); // Calculate ordinals per task (ceiling division to cover all ordinals) int ordinalsPerTask = (totalOrdinals + numTasks - 1) / numTasks; @@ -211,7 +213,8 @@ public void writeL0Records(OrdinalMapper ordinalMapper, featureStateSuppliers, recordSize, baseOffset, // Base offset (task calculates per-ordinal offsets) - buffer + buffer, + featuresPreWritten ); return task.call(); diff --git a/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/RandomAccessOnDiskGraphIndexWriter.java b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/RandomAccessOnDiskGraphIndexWriter.java new file mode 100644 index 000000000..4042efae4 --- /dev/null +++ b/jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/RandomAccessOnDiskGraphIndexWriter.java @@ -0,0 +1,208 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.jbellis.jvector.graph.disk; + +import io.github.jbellis.jvector.disk.RandomAccessWriter; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; +import io.github.jbellis.jvector.graph.OnHeapGraphIndex; +import io.github.jbellis.jvector.graph.disk.feature.Feature; +import io.github.jbellis.jvector.graph.disk.feature.FeatureId; + +import java.io.IOException; +import java.util.EnumMap; +import java.util.Map; +import java.util.function.IntFunction; + +/** + * Base class for graph index writers that use RandomAccessWriter for output. + *

+ * This class provides common functionality for writers that need random access + * capabilities, such as seeking to specific positions and rewriting headers. + *

+ * Subclasses include: + *

    + *
  • {@link OnDiskGraphIndexWriter} - Sequential writing with header updates
  • + *
  • {@link OnDiskParallelGraphIndexWriter} - Parallel writing with async I/O
  • + *
+ */ +public abstract class RandomAccessOnDiskGraphIndexWriter extends AbstractGraphIndexWriter { + protected final long startOffset; + + /** + * Constructs a RandomAccessOnDiskGraphIndexWriter. + * + * @param randomAccessWriter the writer to use for output + * @param version the format version to write + * @param startOffset the starting offset in the file + * @param graph the graph to write + * @param oldToNewOrdinals mapper for ordinal renumbering + * @param dimension the vector dimension + * @param features the features to include + */ + protected RandomAccessOnDiskGraphIndexWriter(RandomAccessWriter randomAccessWriter, + int version, + long startOffset, + ImmutableGraphIndex graph, + OrdinalMapper oldToNewOrdinals, + int dimension, + EnumMap features) + { + super(randomAccessWriter, version, graph, oldToNewOrdinals, dimension, features); + this.startOffset = startOffset; + } + + /** + * Close the view and the output stream. For backwards compatibility reasons, + * this method assumes ownership of the output stream. + */ + @Override + public synchronized void close() throws IOException { + out.close(); + } + + /** + * Caller should synchronize on this writer instance if mixing usage of the + * output with calls to any of the synchronized methods in this class. + *

+ * Provided for callers (like Cassandra) that want to add their own header/footer to the output. + */ + public RandomAccessWriter getOutput() { + return out; + } + + /** + * Write the inline features of the given ordinal to the output at the correct offset. + * Nothing else is written (no headers, no edges). The output IS NOT flushed. + *

+ * Note: the ordinal given is implicitly a "new" ordinal in the sense of the OrdinalMapper, + * but since no nodes or edges are involved (we just write the given State to the index file), + * the mapper is not invoked. + */ + public synchronized void writeInline(int ordinal, Map stateMap) throws IOException + { + writeFeaturesInline(ordinal, stateMap); + } + + /** + * Write the inline features of the given ordinal to the output at the correct offset. + * Nothing else is written (no headers, no edges). The output IS NOT flushed. + *

+ * Note: the ordinal given is implicitly a "new" ordinal in the sense of the OrdinalMapper, + * but since no nodes or edges are involved (we just write the given State to the index file), + * the mapper is not invoked. + */ + public synchronized void writeFeaturesInline(int ordinal, Map stateMap) throws IOException { + for (var featureId : stateMap.keySet()) { + if (!featureMap.containsKey(featureId)) { + throw new IllegalArgumentException(String.format("Feature %s not configured for index", featureId)); + } + } + + out.seek(featureOffsetForOrdinal(ordinal)); + + for (var feature : inlineFeatures) { + var state = stateMap.get(feature.id()); + if (state == null) { + out.seek(out.position() + feature.featureSize()); + } else { + feature.writeInline(out, state); + } + } + + maxOrdinalWritten = Math.max(maxOrdinalWritten, ordinal); + } + + public synchronized void write(Map> featureStateSuppliers) throws IOException + { + if (graph instanceof OnHeapGraphIndex) { + var ohgi = (OnHeapGraphIndex) graph; + if (ohgi.getDeletedNodes().cardinality() > 0) { + throw new IllegalArgumentException("Run builder.cleanup() before writing the graph"); + } + } + for (var featureId : featureStateSuppliers.keySet()) { + if (!featureMap.containsKey(featureId)) { + throw new IllegalArgumentException(String.format("Feature %s not configured for index", featureId)); + } + } + if (ordinalMapper.maxOrdinal() < graph.size(0) - 1) { + var msg = String.format("Ordinal mapper from [0..%d] does not cover all nodes in the graph of size %d", + ordinalMapper.maxOrdinal(), graph.size(0)); + throw new IllegalStateException(msg); + } + + var view = graph.getView(); + + writeHeader(view); // sets position to start writing features + + writeL0Records(view, featureStateSuppliers); + + // We will use the abstract method because no random access is needed + writeSparseLevels(view, featureStateSuppliers); + + // We will use the abstract method because no random access is needed + writeSeparatedFeatures(featureStateSuppliers); + + if (version >= 5) { + writeFooter(view, out.position()); + } + final var endOfGraphPosition = out.position(); + + // Write the header again with updated offsets + writeHeader(view); + out.seek(endOfGraphPosition); + out.flush(); + view.close(); + } + + protected abstract void writeL0Records(ImmutableGraphIndex.View view, + Map> featureStateSuppliers) throws IOException; + + /** + * Computes the file offset for the inline features of a given ordinal. + * + * @param ordinal the ordinal to compute the offset for + * @return the file offset + */ + protected long featureOffsetForOrdinal(int ordinal) { + return super.featureOffsetForOrdinal(startOffset, ordinal); + } + + /** + * Write the index header and completed edge lists to the given output. + * This method flushes the output and seeks to the startOffset to rewrite the header. + * + * @param view the graph index view + * @throws IOException if there is an error writing the header + */ + public synchronized void writeHeader(ImmutableGraphIndex.View view) throws IOException { + out.seek(startOffset); + super.writeHeader(view, startOffset); + out.flush(); + } + + /** + * CRC32 checksum of bytes written since the starting offset. + * + * @return the checksum value + * @throws IOException if an I/O error occurs + */ + public synchronized long checksum() throws IOException { + long endOffset = out.position(); + return out.checksum(startOffset, endOffset); + } +} diff --git a/jvector-examples/src/main/java/io/github/jbellis/jvector/example/Grid.java b/jvector-examples/src/main/java/io/github/jbellis/jvector/example/Grid.java index be5d66d43..a216d777f 100644 --- a/jvector-examples/src/main/java/io/github/jbellis/jvector/example/Grid.java +++ b/jvector-examples/src/main/java/io/github/jbellis/jvector/example/Grid.java @@ -33,14 +33,12 @@ import io.github.jbellis.jvector.graph.GraphIndexBuilder; import io.github.jbellis.jvector.graph.GraphSearcher; import io.github.jbellis.jvector.graph.RandomAccessVectorValues; +import io.github.jbellis.jvector.graph.disk.*; import io.github.jbellis.jvector.graph.disk.feature.Feature; import io.github.jbellis.jvector.graph.disk.feature.FeatureId; import io.github.jbellis.jvector.graph.disk.feature.FusedPQ; import io.github.jbellis.jvector.graph.disk.feature.InlineVectors; import io.github.jbellis.jvector.graph.disk.feature.NVQ; -import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex; -import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter; -import io.github.jbellis.jvector.graph.disk.OrdinalMapper; import io.github.jbellis.jvector.graph.similarity.BuildScoreProvider; import io.github.jbellis.jvector.graph.similarity.DefaultSearchScoreProvider; import io.github.jbellis.jvector.graph.similarity.ScoreFunction; @@ -222,9 +220,9 @@ private static Map, ImmutableGraphIndex> buildOnDisk(List, OnDiskGraphIndexWriter> writers = new HashMap<>(); + Map, OnDiskParallelGraphIndexWriter> writers = new HashMap<>(); Map, Map>> suppliers = new HashMap<>(); - OnDiskGraphIndexWriter scoringWriter = null; + OnDiskParallelGraphIndexWriter scoringWriter = null; int n = 0; for (var features : featureSets) { var graphPath = testDirectory.resolve("graph" + n++); @@ -305,7 +303,7 @@ private static BuilderWithSuppliers builderWithSuppliers(Set features throws FileNotFoundException { var identityMapper = new OrdinalMapper.IdentityMapper(floatVectors.size() - 1); - var builder = new OnDiskGraphIndexWriter.Builder(onHeapGraph, outPath); + var builder = new OnDiskParallelGraphIndexWriter.Builder(onHeapGraph, outPath); builder.withMapper(identityMapper); Map> suppliers = new EnumMap<>(FeatureId.class); @@ -355,10 +353,10 @@ private static DiagnosticLevel getDiagnosticLevel() { } private static class BuilderWithSuppliers { - public final OnDiskGraphIndexWriter.Builder builder; + public final OnDiskParallelGraphIndexWriter.Builder builder; public final Map> suppliers; - public BuilderWithSuppliers(OnDiskGraphIndexWriter.Builder builder, Map> suppliers) { + public BuilderWithSuppliers(OnDiskParallelGraphIndexWriter.Builder builder, Map> suppliers) { this.builder = builder; this.suppliers = suppliers; } diff --git a/jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java b/jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java index b900de2bc..d6887460b 100644 --- a/jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java +++ b/jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java @@ -251,7 +251,6 @@ public static void benchmarkComparison(ImmutableGraphIndex graph, System.out.printf("Writing with NVQ + FUSED_ADC features...%n"); long sequentialStart = System.nanoTime(); try (var writer = new OnDiskGraphIndexWriter.Builder(graph, sequentialPath) - .withParallelWrites(false) .with(nvqFeature) .with(fusedPQFeature) .withMapper(identityMapper) @@ -270,8 +269,7 @@ public static void benchmarkComparison(ImmutableGraphIndex graph, // Parallel write long parallelStart = System.nanoTime(); - try (var writer = new OnDiskGraphIndexWriter.Builder(graph, parallelPath) - .withParallelWrites(true) + try (var writer = new OnDiskParallelGraphIndexWriter.Builder(graph, parallelPath) .with(nvqFeature) .with(fusedPQFeature) .withMapper(identityMapper) diff --git a/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskParallelGraphIndexWriter.java b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskParallelGraphIndexWriter.java new file mode 100644 index 000000000..4c89ceb74 --- /dev/null +++ b/jvector-tests/src/test/java/io/github/jbellis/jvector/graph/disk/TestOnDiskParallelGraphIndexWriter.java @@ -0,0 +1,312 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.github.jbellis.jvector.graph.disk; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import io.github.jbellis.jvector.LuceneTestCase; +import io.github.jbellis.jvector.TestUtil; +import io.github.jbellis.jvector.disk.SimpleMappedReader; +import io.github.jbellis.jvector.graph.GraphIndexBuilder; +import io.github.jbellis.jvector.graph.ImmutableGraphIndex; +import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues; +import io.github.jbellis.jvector.graph.disk.feature.Feature; +import io.github.jbellis.jvector.graph.disk.feature.FeatureId; +import io.github.jbellis.jvector.graph.disk.feature.InlineVectors; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class TestOnDiskParallelGraphIndexWriter extends LuceneTestCase { + private Path testDirectory; + + @Before + public void setup() throws IOException { + testDirectory = Files.createTempDirectory(this.getClass().getSimpleName()); + } + + @After + public void tearDown() { + TestUtil.deleteQuietly(testDirectory); + } + + /** + * Tests that OnDiskParallelGraphIndexWriter produces identical output whether: + * 1. Using two-phase approach: writeFeaturesInline() then write() + * 2. Using single-phase approach: just write() + */ + @Test + public void testTwoPhaseVsSinglePhaseWriting() throws IOException { + // Test with single layer graph + testTwoPhaseVsSinglePhaseWriting(false); + // Test with multi-layer graph + testTwoPhaseVsSinglePhaseWriting(true); + } + + private void testTwoPhaseVsSinglePhaseWriting(boolean addHierarchy) throws IOException { + // Setup test parameters + int dimension = 16; + int size = 100; + int maxConnections = 8; + int beamWidth = 100; + float alpha = 1.2f; + float neighborOverflow = 1.2f; + + // Create random vectors and build a graph + var ravv = new ListRandomAccessVectorValues( + new ArrayList<>(TestUtil.createRandomVectors(size, dimension)), + dimension + ); + var builder = new GraphIndexBuilder( + ravv, + VectorSimilarityFunction.COSINE, + maxConnections, + beamWidth, + neighborOverflow, + alpha, + addHierarchy + ); + ImmutableGraphIndex graph = TestUtil.buildSequentially(builder, ravv); + + // Path for two-phase write + Path twoPhaseIndexPath = testDirectory.resolve("graph_two_phase_hierarchy_" + addHierarchy); + // Path for single-phase write + Path singlePhaseIndexPath = testDirectory.resolve("graph_single_phase_hierarchy_" + addHierarchy); + + // Create feature state suppliers + var suppliers = Feature.singleStateFactory( + FeatureId.INLINE_VECTORS, + nodeId -> new InlineVectors.State(ravv.getVector(nodeId)) + ); + + // TWO-PHASE APPROACH: writeFeaturesInline then write + try (var writer = new OnDiskParallelGraphIndexWriter.Builder(graph, twoPhaseIndexPath) + .with(new InlineVectors(ravv.dimension())) + .build()) { + + // Phase 1: Write inline features for all nodes + for (int ordinal = 0; ordinal < graph.size(0); ordinal++) { + Map stateMap = Map.of( + FeatureId.INLINE_VECTORS, + new InlineVectors.State(ravv.getVector(ordinal)) + ); + writer.writeFeaturesInline(ordinal, stateMap); + } + + // Phase 2: Write the complete graph (including edges and metadata) + writer.write(suppliers); + } + + // SINGLE-PHASE APPROACH: just write + try (var writer = new OnDiskParallelGraphIndexWriter.Builder(graph, singlePhaseIndexPath) + .with(new InlineVectors(ravv.dimension())) + .build()) { + + // Write everything in one call + writer.write(suppliers); + } + + // Verify both files are identical + assertFilesIdentical(twoPhaseIndexPath, singlePhaseIndexPath); + + // Verify both graphs load correctly and are equivalent + try (var readerSupplier1 = new SimpleMappedReader.Supplier(twoPhaseIndexPath); + var readerSupplier2 = new SimpleMappedReader.Supplier(singlePhaseIndexPath); + var onDiskGraph1 = OnDiskGraphIndex.load(readerSupplier1); + var onDiskGraph2 = OnDiskGraphIndex.load(readerSupplier2)) { + + // Both should match the original graph + TestUtil.assertGraphEquals(graph, onDiskGraph1); + TestUtil.assertGraphEquals(graph, onDiskGraph2); + + // Both should be identical to each other + TestUtil.assertGraphEquals(onDiskGraph1, onDiskGraph2); + + // Verify vectors are correct in both + try (var view1 = onDiskGraph1.getView(); + var view2 = onDiskGraph2.getView()) { + TestOnDiskGraphIndex.validateVectors(view1, ravv); + TestOnDiskGraphIndex.validateVectors(view2, ravv); + } + } + } + + /** + * Tests that parallel writing with different worker thread counts produces identical output. + */ + @Test + public void testParallelWritingWithDifferentThreadCounts() throws IOException { + // Setup test parameters + int dimension = 16; + int size = 100; + int maxConnections = 8; + int beamWidth = 100; + float alpha = 1.2f; + float neighborOverflow = 1.2f; + boolean addHierarchy = false; + + // Create random vectors and build a graph + var ravv = new ListRandomAccessVectorValues( + new ArrayList<>(TestUtil.createRandomVectors(size, dimension)), + dimension + ); + var builder = new GraphIndexBuilder( + ravv, + VectorSimilarityFunction.COSINE, + maxConnections, + beamWidth, + neighborOverflow, + alpha, + addHierarchy + ); + ImmutableGraphIndex graph = TestUtil.buildSequentially(builder, ravv); + + // Create feature state suppliers + var suppliers = Feature.singleStateFactory( + FeatureId.INLINE_VECTORS, + nodeId -> new InlineVectors.State(ravv.getVector(nodeId)) + ); + + // Write with different thread counts + int[] threadCounts = {0, 1, 2, 4}; // 0 means use available processors + Path[] paths = new Path[threadCounts.length]; + + for (int i = 0; i < threadCounts.length; i++) { + paths[i] = testDirectory.resolve("graph_threads_" + threadCounts[i]); + + OnDiskParallelGraphIndexWriter.Builder writerBuilder = new OnDiskParallelGraphIndexWriter.Builder(graph, paths[i]); + writerBuilder.withParallelWorkerThreads(threadCounts[i]); + writerBuilder.with(new InlineVectors(ravv.dimension())); + + try (OnDiskParallelGraphIndexWriter writer = writerBuilder.build()) { + writer.write(suppliers); + } + } + + // Verify all files are identical + for (int i = 1; i < paths.length; i++) { + assertFilesIdentical(paths[0], paths[i]); + } + + // Verify all graphs load correctly + for (Path path : paths) { + try (var readerSupplier = new SimpleMappedReader.Supplier(path); + var onDiskGraph = OnDiskGraphIndex.load(readerSupplier)) { + TestUtil.assertGraphEquals(graph, onDiskGraph); + try (var view = onDiskGraph.getView()) { + TestOnDiskGraphIndex.validateVectors(view, ravv); + } + } + } + } + + /** + * Tests two-phase writing with parallel configuration. + */ + @Test + public void testTwoPhaseParallelWriting() throws IOException { + // Setup test parameters + int dimension = 16; + int size = 100; + int maxConnections = 8; + int beamWidth = 100; + float alpha = 1.2f; + float neighborOverflow = 1.2f; + boolean addHierarchy = true; + + // Create random vectors and build a graph + var ravv = new ListRandomAccessVectorValues( + new ArrayList<>(TestUtil.createRandomVectors(size, dimension)), + dimension + ); + var builder = new GraphIndexBuilder( + ravv, + VectorSimilarityFunction.COSINE, + maxConnections, + beamWidth, + neighborOverflow, + alpha, + addHierarchy + ); + ImmutableGraphIndex graph = TestUtil.buildSequentially(builder, ravv); + + Path indexPath = testDirectory.resolve("graph_two_phase_parallel"); + + // Create feature state suppliers + var suppliers = Feature.singleStateFactory( + FeatureId.INLINE_VECTORS, + nodeId -> new InlineVectors.State(ravv.getVector(nodeId)) + ); + + // Two-phase write with parallel configuration + OnDiskParallelGraphIndexWriter.Builder writerBuilder = new OnDiskParallelGraphIndexWriter.Builder(graph, indexPath); + writerBuilder.withParallelWorkerThreads(4); + writerBuilder.withParallelDirectBuffers(true); + writerBuilder.with(new InlineVectors(ravv.dimension())); + + try (OnDiskParallelGraphIndexWriter writer = writerBuilder.build()) { + // Phase 1: Write inline features + for (int ordinal = 0; ordinal < graph.size(0); ordinal++) { + Map stateMap = Map.of( + FeatureId.INLINE_VECTORS, + new InlineVectors.State(ravv.getVector(ordinal)) + ); + writer.writeFeaturesInline(ordinal, stateMap); + } + + // Phase 2: Write complete graph with parallel workers + writer.write(suppliers); + } + + // Verify the graph loads correctly + try (var readerSupplier = new SimpleMappedReader.Supplier(indexPath); + var onDiskGraph = OnDiskGraphIndex.load(readerSupplier)) { + TestUtil.assertGraphEquals(graph, onDiskGraph); + try (var view = onDiskGraph.getView()) { + TestOnDiskGraphIndex.validateVectors(view, ravv); + } + } + } + + /** + * Helper method to assert that two files are byte-for-byte identical. + */ + private void assertFilesIdentical(Path path1, Path path2) throws IOException { + byte[] bytes1 = Files.readAllBytes(path1); + byte[] bytes2 = Files.readAllBytes(path2); + + assertEquals("File sizes differ", bytes1.length, bytes2.length); + assertArrayEquals( + "Files are not identical: " + path1 + " vs " + path2, + bytes1, + bytes2 + ); + } +} + +// Made with Bob