Skip to content

LavenderDataLoader - Data Parallelism

For data parallelism, you can set the rank parameter to specify the rank of the node. The server will see the world_size and rank parameters to determine whether to create a new iteration or return an existing one. By this way the iteration object will be shared among all the ranks.

dataloader = LavenderDataLoader(
dataset_id=dataset.id,
shardsets=[shardset.id],
rank=os.environ["RANK"],
world_size=os.environ["WORLD_SIZE"],
)
retrieved = list(dataloader)
len(retrieved) # len(dataset) // world_size

Dynamic World Size

If you don’t know the world size beforehand, you can set the wait_participant_threshold parameter. This will wait for wait_participant_threshold seconds for other nodes to join the iteration. If some nodes join after the threshold, a new iteration will be created for them.

dataloader = LavenderDataLoader(
dataset_id=dataset.id,
shardsets=[shardset.id],
rank=os.environ["RANK"],
wait_participant_threshold=10,
)

Replication PG

For context parallelism, you can set the replication_pg parameter. It is a list of list of integers, where each inner list is a partition of the ranks. For example, [[0, 1], [2, 3]] means that rank 0 and 1 are in the first partition, and rank 2 and 3 are in the second partition. Within each partition, the ranks get the same samples.

dataloader = LavenderDataLoader(
dataset_id=dataset.id,
shardsets=[shardset.id],
rank=os.environ["RANK"],
replication_pg=[[0, 1], [2, 3]],
)