import importlib
import json
import logging
import os
from easyfl.datasets.dataset import FederatedTensorDataset
from easyfl.datasets.utils.base_dataset import BaseDataset, CIFAR10, CIFAR100
from easyfl.datasets.utils.util import load_dict
logger = logging.getLogger(__name__)
def read_dir(data_dir):
clients = []
groups = []
data = {}
files = os.listdir(data_dir)
files = [f for f in files if f.endswith('.json')]
for f in files:
file_path = os.path.join(data_dir, f)
with open(file_path, 'r') as inf:
cdata = json.load(inf)
clients.extend(cdata['users'])
if 'hierarchies' in cdata:
groups.extend(cdata['hierarchies'])
data.update(cdata['user_data'])
clients = list(sorted(data.keys()))
return clients, groups, data
def read_data(dataset_name, train_data_dir, test_data_dir):
"""Load datasets from data directories.
Args:
dataset_name (str): The name of the dataset.
train_data_dir (str): The directory of training data.
test_data_dir (str): The directory of testing data.
Returns:
list[str]: A list of client ids.
list[str]: A list of group ids for dataset with hierarchies.
dict: A dictionary of training data, e.g., {"id1": {"x": data, "y": label}, "id2": {"x": data, "y": label}}.
dict: A dictionary of testing data. The format is same as training data for FEMNIST and Shakespeare datasets.
For CIFAR datasets, the format is {"x": data, "y": label}, for centralized testing in the server.
"""
if dataset_name == CIFAR10 or dataset_name == CIFAR100:
train_data = load_dict(train_data_dir)
test_data = load_dict(test_data_dir)
return [], [], train_data, test_data
# Data in the directories are `json` files with keys `users` and `user_data`.
train_clients, train_groups, train_data = read_dir(train_data_dir)
test_clients, test_groups, test_data = read_dir(test_data_dir)
assert train_clients == test_clients
assert train_groups == test_groups
return train_clients, train_groups, train_data, test_data
def load_data(root,
dataset_name,
num_of_clients,
split_type,
min_size,
class_per_client,
data_amount,
iid_fraction,
user,
train_test_split,
quantity_weights,
alpha):
"""Simulate and load federated datasets.
Args:
root (str): The root directory where datasets stored.
dataset_name (str): The name of the dataset. It currently supports: femnist, shakespeare, cifar10, and cifar100.
Among them, femnist and shakespeare are adopted from LEAF benchmark.
num_of_clients (int): The targeted number of clients to construct.
split_type (str): The type of statistical simulation, options: iid, dir, and class.
`iid` means independent and identically distributed data.
`niid` means non-independent and identically distributed data for Femnist and Shakespeare.
`dir` means using Dirichlet process to simulate non-iid data, for CIFAR-10 and CIFAR-100 datasets.
`class` means partitioning the dataset by label classes, for datasets like CIFAR-10, CIFAR-100.
min_size (int): The minimal number of samples in each client.
It is applicable for LEAF datasets and dir simulation of CIFAR-10 and CIFAR-100.
class_per_client (int): The number of classes in each client. Only applicable when the split_type is 'class'.
data_amount (float): The fraction of data sampled for LEAF datasets.
e.g., 10% means that only 10% of total dataset size are used.
iid_fraction (float): The fraction of the number of clients used when the split_type is 'iid'.
user (bool): A flag to indicate whether partition users of the dataset into train-test groups.
Only applicable to LEAF datasets.
True means partitioning users of the dataset into train-test groups.
False means partitioning each users' samples into train-test groups.
train_test_split (float): The fraction of data for training; the rest are for testing.
e.g., 0.9 means 90% of data are used for training and 10% are used for testing.
quantity_weights (list[float]): The targeted distribution of quantities to simulate data quantity heterogeneity.
The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
The `num_of_clients` should be divisible by `len(weights)`.
None means clients are simulated with the same data quantity.
alpha (float): The parameter for Dirichlet distribution simulation, applicable only when split_type is `dir`.
Returns:
dict: A dictionary of training data, e.g., {"id1": {"x": data, "y": label}, "id2": {"x": data, "y": label}}.
dict: A dictionary of testing data.
function: A function to preprocess training data.
function: A function to preprocess testing data.
torchvision.transforms.transforms.Compose: Training data transformation.
torchvision.transforms.transforms.Compose: Testing data transformation.
"""
user_str = "user" if user else "sample"
setting = BaseDataset.get_setting_folder(dataset_name, split_type, num_of_clients, min_size, class_per_client,
data_amount, iid_fraction, user_str, train_test_split, alpha,
quantity_weights)
dir_path = os.path.dirname(os.path.realpath(__file__))
dataset_file = os.path.join(dir_path, "data_process", "{}.py".format(dataset_name))
if not os.path.exists(dataset_file):
logger.error("Please specify a valid process file path for process_x and process_y functions.")
dataset_path = "easyfl.datasets.data_process.{}".format(dataset_name)
dataset_lib = importlib.import_module(dataset_path)
process_x = getattr(dataset_lib, "process_x", None)
process_y = getattr(dataset_lib, "process_y", None)
transform_train = getattr(dataset_lib, "transform_train", None)
transform_test = getattr(dataset_lib, "transform_test", None)
data_dir = os.path.join(root, dataset_name)
if not data_dir:
os.makedirs(data_dir)
train_data_dir = os.path.join(data_dir, setting, "train")
test_data_dir = os.path.join(data_dir, setting, "test")
if not os.path.exists(train_data_dir) or not os.path.exists(test_data_dir):
dataset_class_path = "easyfl.datasets.{}.{}".format(dataset_name, dataset_name)
dataset_class_lib = importlib.import_module(dataset_class_path)
class_name = dataset_name.capitalize()
dataset = getattr(dataset_class_lib, class_name)(root=data_dir,
fraction=data_amount,
split_type=split_type,
user=user,
iid_user_fraction=iid_fraction,
train_test_split=train_test_split,
minsample=min_size,
num_of_client=num_of_clients,
class_per_client=class_per_client,
setting_folder=setting,
alpha=alpha,
weights=quantity_weights)
try:
filename = f"{setting}.zip"
dataset.download_packaged_dataset_and_extract(filename)
logger.info(f"Downloaded packaged dataset {dataset_name}: {filename}")
except Exception as e:
logger.info(f"Failed to download packaged dataset: {e.args}")
# CIFAR10 generate data in setup() stage, LEAF related datasets generate data in sampling()
if not os.path.exists(train_data_dir):
dataset.setup()
if not os.path.exists(train_data_dir):
dataset.sampling()
users, train_groups, train_data, test_data = read_data(dataset_name, train_data_dir, test_data_dir)
return train_data, test_data, process_x, process_y, transform_train, transform_test
[docs]def construct_datasets(root,
dataset_name,
num_of_clients,
split_type,
min_size,
class_per_client,
data_amount,
iid_fraction,
user,
train_test_split,
quantity_weights,
alpha):
"""Construct and load provided federated learning datasets.
Args:
root (str): The root directory where datasets stored.
dataset_name (str): The name of the dataset. It currently supports: femnist, shakespeare, cifar10, and cifar100.
Among them, femnist and shakespeare are adopted from LEAF benchmark.
num_of_clients (int): The targeted number of clients to construct.
split_type (str): The type of statistical simulation, options: iid, dir, and class.
`iid` means independent and identically distributed data.
`niid` means non-independent and identically distributed data for Femnist and Shakespeare.
`dir` means using Dirichlet process to simulate non-iid data, for CIFAR-10 and CIFAR-100 datasets.
`class` means partitioning the dataset by label classes, for datasets like CIFAR-10, CIFAR-100.
min_size (int): The minimal number of samples in each client.
It is applicable for LEAF datasets and dir simulation of CIFAR-10 and CIFAR-100.
class_per_client (int): The number of classes in each client. Only applicable when the split_type is 'class'.
data_amount (float): The fraction of data sampled for LEAF datasets.
e.g., 10% means that only 10% of total dataset size are used.
iid_fraction (float): The fraction of the number of clients used when the split_type is 'iid'.
user (bool): A flag to indicate whether partition users of the dataset into train-test groups.
Only applicable to LEAF datasets.
True means partitioning users of the dataset into train-test groups.
False means partitioning each users' samples into train-test groups.
train_test_split (float): The fraction of data for training; the rest are for testing.
e.g., 0.9 means 90% of data are used for training and 10% are used for testing.
quantity_weights (list[float]): The targeted distribution of quantities to simulate data quantity heterogeneity.
The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
The `num_of_clients` should be divisible by `len(weights)`.
None means clients are simulated with the same data quantity.
alpha (float): The parameter for Dirichlet distribution simulation, applicable only when split_type is `dir`.
Returns:
:obj:`FederatedDataset`: Training dataset.
:obj:`FederatedDataset`: Testing dataset.
"""
train_data, test_data, process_x, process_y, transform_train, transform_test = load_data(root,
dataset_name,
num_of_clients,
split_type,
min_size,
class_per_client,
data_amount,
iid_fraction,
user,
train_test_split,
quantity_weights,
alpha)
# CIFAR datasets are simulated.
test_simulated = True
if dataset_name == CIFAR10 or dataset_name == CIFAR100:
test_simulated = False
train_data = FederatedTensorDataset(train_data,
simulated=True,
do_simulate=False,
process_x=process_x,
process_y=process_y,
transform=transform_train)
test_data = FederatedTensorDataset(test_data,
simulated=test_simulated,
do_simulate=False,
process_x=process_x,
process_y=process_y,
transform=transform_test)
return train_data, test_data