import heapq
import logging
import math
import numpy as np
SIMULATE_IID = "iid"
SIMULATE_NIID_DIR = "dir"
SIMULATE_NIID_CLASS = "class"
logger = logging.getLogger(__name__)
def shuffle(data_x, data_y):
num_of_data = len(data_y)
data_x = np.array(data_x)
data_y = np.array(data_y)
index = [i for i in range(num_of_data)]
np.random.shuffle(index)
data_x = data_x[index]
data_y = data_y[index]
return data_x, data_y
[docs]def equal_division(num_groups, data_x, data_y=None):
"""Partition data into multiple clients with equal quantity.
Args:
num_groups (int): THe number of groups to partition to.
data_x (list[Object]): A list of elements to be divided.
data_y (list[Object], optional): A list of data labels to be divided together with the data.
Returns:
list[list]: A list where each element is a list of data of a group/client.
list[list]: A list where each element is a list of data label of a group/client.
Example:
>>> equal_division(3, list[range(9)])
>>> ([[0,4,2],[3,1,7],[6,5,8]], [])
"""
if data_y is not None:
assert (len(data_x) == len(data_y))
data_x, data_y = shuffle(data_x, data_y)
else:
np.random.shuffle(data_x)
num_of_data = len(data_x)
assert num_of_data > 0
data_per_client = num_of_data // num_groups
large_group_num = num_of_data - num_groups * data_per_client
small_group_num = num_groups - large_group_num
splitted_data_x = []
splitted_data_y = []
for i in range(small_group_num):
base_index = data_per_client * i
splitted_data_x.append(data_x[base_index: base_index + data_per_client])
if data_y is not None:
splitted_data_y.append(data_y[base_index: base_index + data_per_client])
small_size = data_per_client * small_group_num
data_per_client += 1
for i in range(large_group_num):
base_index = small_size + data_per_client * i
splitted_data_x.append(data_x[base_index: base_index + data_per_client])
if data_y is not None:
splitted_data_y.append(data_y[base_index: base_index + data_per_client])
return splitted_data_x, splitted_data_y
[docs]def quantity_hetero(weights, data_x, data_y=None):
"""Partition data into multiple clients with different quantities.
The number of groups is the same as the number of elements of `weights`.
The quantity of each group depends on the values of `weights`.
Args:
weights (list[float]): The targeted distribution of data quantities.
The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
data_x (list[Object]): A list of elements to be divided.
data_y (list[Object], optional): A list of data labels to be divided together with the data.
Returns:
list[list]: A list where each element is a list of data of a group/client.
list[list]: A list where each element is a list of data label of a group/client.
Example:
>>> quantity_hetero([0.1, 0.2, 0.7], list(range(0, 10)))
>>> ([[4], [8, 9], [6, 0, 1, 7, 3, 2, 5]], [])
"""
# This is due to the float number in python,
# e.g.sum([0.1,0.2,0.4,0.2,0.1]) is not exactly 1, but 1.0000000000000002.
assert (round(sum(weights), 3) == 1)
if data_y is not None:
assert (len(data_x) == len(data_y))
data_x, data_y = shuffle(data_x, data_y)
else:
np.random.shuffle(data_x)
data_size = len(data_x)
i = 0
splitted_data_x = []
splitted_data_y = []
for w in weights:
size = math.floor(data_size * w)
splitted_data_x.append(data_x[i:i + size])
if data_y is not None:
splitted_data_y.append(data_y[i:i + size])
i += size
parts = len(weights)
if i < data_size:
remain = data_size - i
for i in range(-remain, 0, 1):
splitted_data_x[(-i) % parts].append(data_x[i])
if data_y is not None:
splitted_data_y[(-i) % parts].append(data_y[i])
return splitted_data_x, splitted_data_y
[docs]def iid(data_x, data_y, num_of_clients, x_dtype, y_dtype):
"""Partition dataset into multiple clients with equal data quantity (difference is less than 1) randomly.
Args:
data_x (list[Object]): A list of data.
data_y (list[Object]): A list of dataset labels.
num_of_clients (int): The number of clients to partition to.
x_dtype (numpy.dtype): The type of data.
y_dtype (numpy.dtype): The type of data label.
Returns:
list[str]: A list of client ids.
dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
"""
data_x, data_y = shuffle(data_x, data_y)
x_divided_list, y_divided_list = equal_division(num_of_clients, data_x, data_y)
clients = []
federated_data = {}
for i in range(num_of_clients):
client_id = "f%07.0f" % (i)
temp_client = {}
temp_client['x'] = np.array(x_divided_list[i]).astype(x_dtype)
temp_client['y'] = np.array(y_divided_list[i]).astype(y_dtype)
federated_data[client_id] = temp_client
clients.append(client_id)
return clients, federated_data
[docs]def non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size, x_dtype, y_dtype):
"""Partition dataset into multiple clients following the Dirichlet process.
Args:
data_x (list[Object]): A list of data.
data_y (list[Object]): A list of dataset labels.
num_of_clients (int): The number of clients to partition to.
alpha (float): The parameter for Dirichlet process simulation.
min_size (int): The minimum number of data size of a client.
x_dtype (numpy.dtype): The type of data.
y_dtype (numpy.dtype): The type of data label.
Returns:
list[str]: A list of client ids.
dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
"""
n_train = data_x.shape[0]
current_min_size = 0
num_class = np.amax(data_y) + 1
data_size = data_y.shape[0]
net_dataidx_map = {}
while current_min_size < min_size:
idx_batch = [[] for _ in range(num_of_clients)]
for k in range(num_class):
idx_k = np.where(data_y == k)[0]
np.random.shuffle(idx_k)
proportions = np.random.dirichlet(np.repeat(alpha, num_of_clients))
# using the proportions from dirichlet, only selet those clients having data amount less than average
proportions = np.array(
[p * (len(idx_j) < data_size / num_of_clients) for p, idx_j in zip(proportions, idx_batch)])
# scale proportions
proportions = proportions / proportions.sum()
proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
idx_batch = [idx_j + idx.tolist() for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))]
current_min_size = min([len(idx_j) for idx_j in idx_batch])
federated_data = {}
clients = []
for j in range(num_of_clients):
np.random.shuffle(idx_batch[j])
client_id = "f%07.0f" % j
clients.append(client_id)
temp = {}
temp['x'] = np.array(data_x[idx_batch[j]]).astype(x_dtype)
temp['y'] = np.array(data_y[idx_batch[j]]).astype(y_dtype)
federated_data[client_id] = temp
net_dataidx_map[client_id] = idx_batch[j]
print_data_distribution(data_y, net_dataidx_map)
return clients, federated_data
[docs]def non_iid_class(data_x, data_y, class_per_client, num_of_clients, x_dtype, y_dtype, stack_x=True):
"""Partition dataset into multiple clients based on label classes.
Each client contains [1, n] classes, where n is the number of classes of a dataset.
Note: Each class is divided into `ceil(class_per_client * num_of_clients / num_class)` parts
and each client chooses `class_per_client` parts from each class to construct its dataset.
Args:
data_x (list[Object]): A list of data.
data_y (list[Object]): A list of dataset labels.
class_per_client (int): The number of classes in each client.
num_of_clients (int): The number of clients to partition to.
x_dtype (numpy.dtype): The type of data.
y_dtype (numpy.dtype): The type of data label.
stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
Returns:
list[str]: A list of client ids.
dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
"""
num_class = np.amax(data_y) + 1
all_index = []
clients = []
data_index_map = {}
for i in range(num_class):
# get indexes for all data with current label i at index i in all_index
all_index.append(np.where(data_y == i)[0].tolist())
federated_data = {}
# total no. of parts
total_amount = class_per_client * num_of_clients
# no. of parts each class should be diveded into
parts_per_class = math.ceil(total_amount / num_class)
for i in range(num_of_clients):
client_id = "f%07.0f" % (i)
clients.append(client_id)
data_index_map[client_id] = []
data = {}
data['x'] = np.array([])
data['y'] = np.array([])
federated_data[client_id] = data
class_map = {}
parts_consumed = []
for i in range(num_class):
class_map[i], _ = equal_division(parts_per_class, all_index[i])
heapq.heappush(parts_consumed, (0, i))
for i in clients:
for j in range(class_per_client):
class_chosen = heapq.heappop(parts_consumed)
part_indexes = class_map[class_chosen[1]].pop(0)
if len(federated_data[i]['x']) != 0:
if stack_x:
federated_data[i]['x'] = np.vstack((federated_data[i]['x'], data_x[part_indexes])).astype(x_dtype)
else:
federated_data[i]['x'] = np.append(federated_data[i]['x'], data_x[part_indexes]).astype(x_dtype)
federated_data[i]['y'] = np.append(federated_data[i]['y'], data_y[part_indexes]).astype(y_dtype)
else:
federated_data[i]['x'] = data_x[part_indexes].astype(x_dtype)
federated_data[i]['y'] = data_y[part_indexes].astype(y_dtype)
heapq.heappush(parts_consumed, (class_chosen[0] + 1, class_chosen[1]))
data_index_map[i].extend(part_indexes)
print_data_distribution(data_y, data_index_map)
return clients, federated_data
[docs]def data_simulation(data_x, data_y, num_of_clients, data_distribution, weights=None, alpha=0.5, min_size=10,
class_per_client=1, stack_x=True):
"""Simulate federated learning datasets by partitioning a data into multiple clients using different strategies.
Args:
data_x (list[Object]): A list of data.
data_y (list[Object]): A list of dataset labels.
num_of_clients (int): The number of clients to partition to.
data_distribution (str): The ways to partition the dataset, options:
`iid`: Partition dataset into multiple clients with equal quantity (difference is less than 1) randomly.
`dir`: partition dataset into multiple clients following the Dirichlet process.
`class`: partition dataset into multiple clients based on classes.
weights: list, for simulating data quantity heterogeneity
If None, each client are simulated with same data quantity
Note: num_of_clients should be divisible by len(weights)
weights (list[float], optional): The targeted distribution of data quantities.
The values should sum up to 1. e.g., [0.1, 0.2, 0.7].
When `weights=None`, the data quantity of clients only depends on data_distribution.
alpha (float, optional): The parameter for Dirichlet process simulation.
It is only applicable when data_distribution is `dir`.
min_size (int, optional): The minimum number of data size of a client.
It is only applicable when data_distribution is `dir`.
class_per_client (int): The number of classes in each client.
It is only applicable when data_distribution is `class`.
stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset.
It is only applicable when data_distribution is `class`.
Raise:
ValueError: When the simulation method `data_distribution` is not supported.
Returns:
list[str]: A list of client ids.
dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}.
"""
data_x = np.array(data_x)
data_y = np.array(data_y)
x_dtype = data_x.dtype
y_dtype = data_y.dtype
if weights is not None:
assert num_of_clients % len(weights) == 0
num_of_clients = num_of_clients // len(weights)
if data_distribution == SIMULATE_IID:
group_client_list, group_federated_data = iid(data_x, data_y, num_of_clients, x_dtype, y_dtype)
elif data_distribution == SIMULATE_NIID_DIR:
group_client_list, group_federated_data = non_iid_dirichlet(data_x, data_y, num_of_clients, alpha, min_size,
x_dtype, y_dtype)
elif data_distribution == SIMULATE_NIID_CLASS:
group_client_list, group_federated_data = non_iid_class(data_x, data_y, class_per_client, num_of_clients,
x_dtype,
y_dtype, stack_x=stack_x)
else:
raise ValueError("Simulation type not supported")
if weights is None:
return group_client_list, group_federated_data
clients = []
federated_data = {}
cur_key = 0
for i in group_client_list:
current_client = group_federated_data[i]
input_lists, label_lists = quantity_hetero(weights, current_client['x'], current_client['y'])
for j in range(len(input_lists)):
client_id = "f%07.0f" % (cur_key)
temp_client = {}
temp_client['x'] = np.array(input_lists[j]).astype(x_dtype)
temp_client['y'] = np.array(label_lists[j]).astype(y_dtype)
federated_data[client_id] = temp_client
clients.append(client_id)
cur_key += 1
return clients, federated_data
def print_data_distribution(data_y, data_index_map):
"""Log the distribution of client datasets."""
data_distribution = {}
for index, dataidx in data_index_map.items():
unique_values, counts = np.unique(data_y[dataidx], return_counts=True)
distribution = {unique_values[i]: counts[i] for i in range(len(unique_values))}
data_distribution[index] = distribution
logger.info(data_distribution)
return data_distribution