π Distributed Document ProcessingΒΆ
OverviewΒΆ
This guide explains how to set up and run distributed document processing for the RAG system across multiple nodes.
Distributed processing allows you to scale document indexing across multiple machines, significantly improving processing speed for large document collections.
The system uses Dask for distributed task scheduling and execution.
β PrerequisitesΒΆ
Before starting, make sure you have:
multiple machines/nodes with network connectivity
a Python environment on each node
access to a shared filesystem or the ability to copy files between nodes
π οΈ Setup ProcessΒΆ
1. Prepare your configuration fileΒΆ
Check your processing configuration file, for example examples/process/config.yaml, and make sure it includes the distributed settings such as:
dispatcher_config:
distributed: true
scheduler_file: "/path/to/scheduler.json"
The scheduler_file should point to a shared location accessible by all nodes.
Other important configuration options include:
input_folder: path to your documentsoutput_folder: xhere processed results will be storeduse_fast_processors: set totruefor faster processing (may reduce accuracy)
2. Install dependencies on all nodesΒΆ
On each node, run:
# Clone the repository (if not already done)
git clone <repository-url>
cd mmore
# Make a virtual environment
uv venv .venv
source .venv/bin/activate
# Install dependencies
uv pip install -e .
3. Launch the distributed processingΒΆ
Step 1: Start the master node (rank 0)ΒΆ
bash scripts/process_distributed.sh --config-file /path/to/config.yaml --rank 0
The master node will:
start the Dask scheduler
launch a worker process
prompt you to start the processing when ready
Step 2: Start worker nodes (rank > 0)ΒΆ
On each additional node, run:
bash scripts/process_distributed.sh --config-path /path/to/config.yaml --rank 1
Replace rank 1 with a unique rank number for each node (1, 2, 3, etc.).
The node should be ready within a few seconds.
Step 3: Begin processingΒΆ
Once all nodes are running, return to the master node and type go.
The master node proceeds to crawl the input folder, split the workload among connected nodes and make them start their work.
At the end of processing, the dask server will be automatically shut down by the master node. This also stops the Dask workers on all the connected nodes.
π Output StructureΒΆ
After processing completes, the output will be organized as follows:
output_folder/
βββ processors/
β βββ Processor_type_1/
β β βββ results.jsonl
β βββ Processor_type_2/
β β βββ results.jsonl
β βββ ...
βββ merged/
β βββ merged_results.jsonl
βββ images/
π§ TroubleshootingΒΆ
Workers not connecting: ensure all nodes can access the scheduler file location
Processing errors: check logs ona the master node
Performance issues: adjust batch sizes and worker counts in the configuration
βοΈ Advanced ConfigurationΒΆ
For optimal performance, consider adjusting:
processor batch sizes
number of threads per worker
memory limits for workers
Refer to Process for more details on configuration options.