for a machine learning task I have coded with support of the internet a configurable PoinNet Version in tensorflow keras.
The keras example from PointNet I got here: https://keras.io/examples/vision/pointnet/
I am using tensorflow = “2.11.0”
**What do I want?
**
- Implemented a configurable class from PointNet (see code below)
- I Implemented it as hybrid net, where I can also feed some metadata in
- I have some stl files which I load with open3d = “^0.18.0” to convert it to PointClouds
- I want to use tf.data.Dataset pipe to load the data and pre- and postprocess it
- I want to predict a certain curve/values on the end, depending on the input (but this is not the problem)
**My Problem
**
- regardless of the number of points in a point cloud, the data is passed through the neural network, although it should not work “architecturally”
- the parameters of the input layer of the PointNet are static according to the format (None, n_points, 3)
- I normally expect an error message if I have a point cloud with the format (128, 3) and an input layer with the format (None, 10, 3). This error message does not appear and the network can be trained epoch by epoch.
- Is this a correct behavior?
**My code block for the Hybrid PointNet
**
class HybridPointNetMeta:
def __init__(self, cfg: DictConfig, **kwargs) -> None:
self.ARCHITECTURE = cfg["ml_model"]["ARCHITECTURE"]
self.MODEL_POINT_NAME = cfg["ml_model"]["MODEL_POINT"]["NAME"]
self.MODEL_POINT_INPUT_SHAPE = cfg["ml_model"]["MODEL_POINT"]["INPUT_SHAPE"]
self.MODEL_POINT_NUM_FEATURES = cfg["ml_model"]["MODEL_POINT"]["NUM_FEATURES"]
self.MODEL_POINT_FILTERS_CONV_1 = cfg["ml_model"]["MODEL_POINT"][
"FILTERS_CONV_1"
]
self.MODEL_POINT_FACTOR_FILTERS_2 = cfg["ml_model"]["MODEL_POINT"][
"FACTOR_FILTERS_2"
]
self.MODEL_POINT_FACTOR_FILTERS_3 = cfg["ml_model"]["MODEL_POINT"][
"FACTOR_FILTERS_3"
]
self.MODEL_POINT_UNITS_DENSE_1 = cfg["ml_model"]["MODEL_POINT"]["UNITS_DENSE_1"]
self.MODEL_POINT_FACTOR_UNITS_2 = cfg["ml_model"]["MODEL_POINT"][
"FACTOR_UNITS_2"
]
self.MODEL_POINT_DROPOUT_RATE = cfg["ml_model"]["MODEL_POINT"]["DROPOUT_RATE"]
self.MODEL_POINT_L2_REG = cfg["ml_model"]["MODEL_POINT"]["L2_REG"]
self.MODEL_POINT_ACTIVATION = cfg["ml_model"]["MODEL_POINT"]["ACTIVATION"]
self.MODEL_POINT_FILTERS_CONV_2 = int(
self.MODEL_POINT_FILTERS_CONV_1 * self.MODEL_POINT_FACTOR_FILTERS_2
)
self.MODEL_POINT_FILTERS_CONV_3 = int(
self.MODEL_POINT_FILTERS_CONV_1 * self.MODEL_POINT_FACTOR_FILTERS_3
)
self.MODEL_POINT_UNITS_DENSE_2 = int(
self.MODEL_POINT_UNITS_DENSE_1 * self.MODEL_POINT_FACTOR_UNITS_2
)
self.MODEL_TNET_FILTERS_CONV_1 = cfg["ml_model"]["MODEL_TNET"]["FILTERS_CONV_1"]
self.MODEL_TNET_FACTOR_FILTERS_2 = cfg["ml_model"]["MODEL_TNET"][
"FACTOR_FILTERS_2"
]
self.MODEL_TNET_FACTOR_FILTERS_3 = cfg["ml_model"]["MODEL_TNET"][
"FACTOR_FILTERS_3"
]
self.MODEL_TNET_UNITS_DENSE_1 = cfg["ml_model"]["MODEL_TNET"]["UNITS_DENSE_1"]
self.MODEL_TNET_FACTOR_UNITS_2 = cfg["ml_model"]["MODEL_TNET"]["FACTOR_UNITS_2"]
self.MODEL_TNET_FILTERS_CONV_2 = int(
self.MODEL_TNET_FILTERS_CONV_1 * self.MODEL_TNET_FACTOR_FILTERS_2
)
self.MODEL_TNET_FILTERS_CONV_3 = int(
self.MODEL_TNET_FILTERS_CONV_1 * self.MODEL_TNET_FACTOR_FILTERS_3
)
self.MODEL_TNET_UNITS_DENSE_2 = int(
self.MODEL_TNET_UNITS_DENSE_1 * self.MODEL_TNET_FACTOR_UNITS_2
)
self.MODEL_META_NAME = cfg["ml_model"]["MODEL_META"]["NAME"]
self.MODEL_META_INPUT_SHAPE = cfg["ml_model"]["MODEL_META"]["INPUT_SHAPE"]
self.MODEL_META_ACTIVATION = cfg["ml_model"]["MODEL_META"]["ACTIVATION"]
self.MODEL_META_UNITS = cfg["ml_model"]["MODEL_META"]["UNITS"]
self.MODEL_MERGED_NAME = cfg["ml_model"]["MODEL_MERGED"]["NAME"]
self.MODEL_MERGED_DROPOUT_RATE = cfg["ml_model"]["MODEL_MERGED"]["DROPOUT_RATE"]
self.MODEL_MERGED_ACTIVATION_IN = cfg["ml_model"]["MODEL_MERGED"][
"ACTIVATION_IN"
]
self.MODEL_MERGED_ACTIVATION_OUT = cfg["ml_model"]["MODEL_MERGED"][
"ACTIVATION_OUT"
]
self.MODEL_MERGED_NUM_BLOCKS = cfg["ml_model"]["MODEL_MERGED"]["NUM_BLOCKS"]
self.MODEL_MERGED_UNITS_DENSE_INI = cfg["ml_model"]["MODEL_MERGED"][
"UNITS_DENSE_INI"
]
self.MODEL_MERGED_UNITS_FACTOR = cfg["ml_model"]["MODEL_MERGED"]["UNITS_FACTOR"]
self.MODEL_MERGED_DROPOUT_RATE = cfg["ml_model"]["MODEL_MERGED"]["DROPOUT_RATE"]
self.MODEL_MERGED_UNITS_OUTPUT = cfg["ml_model"]["MODEL_MERGED"]["UNITS_OUTPUT"]
def _build_dense_meta_model(self) -> keras.Model:
pass
inputs = keras.Input(self.MODEL_META_INPUT_SHAPE)
x = layers.Dense(
units=self.MODEL_META_UNITS, activation=self.MODEL_META_ACTIVATION
)(inputs)
model = keras.Model(inputs, outputs=x, name=self.MODEL_META_NAME)
return model
"""
### Build a model
Each convolution and fully-connected layer (with exception for end layers) consits of
Convolution / Dense -> Batch Normalization -> ReLU Activation.
"""
def conv_bn(self, x, filters):
x = layers.Conv1D(filters, kernel_size=1, padding="valid")(x)
x = layers.BatchNormalization(momentum=0.0)(x)
return layers.Activation(self.MODEL_POINT_ACTIVATION)(x)
def dense_bn(self, x, filters):
x = layers.Dense(filters)(x)
x = layers.BatchNormalization(momentum=0.0)(x)
return layers.Activation(self.MODEL_POINT_ACTIVATION)(x)
"""
PointNet consists of two core components. The primary MLP network, and the transformer
net (T-net). The T-net aims to learn an affine transformation matrix by its own mini
network. The T-net is used twice. The first time to transform the input features (n, 3)
into a canonical representation. The second is an affine transformation for alignment in
feature space (n, 3). As per the original paper we constrain the transformation to be
close to an orthogonal matrix (i.e. ||X*X^T - I|| = 0).
"""
class OrthogonalRegularizer(keras.regularizers.Regularizer):
def __init__(self, num_features, l2reg=0.001, **kwargs):
self.num_features = num_features
self.l2reg = l2reg
self.eye = tf.eye(num_features)
def __call__(self, x):
x = tf.reshape(x, (-1, self.num_features, self.num_features))
xxt = tf.tensordot(x, x, axes=(2, 2))
xxt = tf.reshape(xxt, (-1, self.num_features, self.num_features))
return tf.reduce_sum(self.l2reg * tf.square(xxt - self.eye))
def get_config(self):
return {
"num_features": self.num_features,
"l2reg": self.l2reg,
}
@classmethod
def from_config(cls, config):
return cls(**config)
"""
We can then define a general function to build T-net layers.
"""
def tnet(self, inputs, num_features):
# Initalise bias as the indentity matrix
bias = keras.initializers.Constant(np.eye(num_features).flatten())
reg = self.OrthogonalRegularizer(num_features, self.MODEL_POINT_L2_REG)
x = self.conv_bn(inputs, self.MODEL_TNET_FILTERS_CONV_1)
x = self.conv_bn(x, self.MODEL_TNET_FILTERS_CONV_2)
x = self.conv_bn(x, self.MODEL_TNET_FILTERS_CONV_3)
x = layers.GlobalMaxPooling1D()(x)
x = self.dense_bn(x, self.MODEL_TNET_UNITS_DENSE_1)
x = self.dense_bn(x, self.MODEL_TNET_UNITS_DENSE_2)
x = layers.Dense(
num_features * num_features,
kernel_initializer="zeros",
bias_initializer=bias,
activity_regularizer=reg,
)(x)
feat_T = layers.Reshape((num_features, num_features))(x)
# Apply affine transformation to input features
return layers.Dot(axes=(2, 1))([inputs, feat_T])
def build_pointnet_model(self):
"""
The main network can be then implemented in the same manner where the t-net mini models
can be dropped in a layers in the graph. Here we replicate the network architecture
published in the original paper but with half the number of weights at each layer as we
are using the smaller 10 class ModelNet dataset.
"""
inputs = keras.Input(shape=self.MODEL_POINT_INPUT_SHAPE)
x = self.tnet(inputs, self.MODEL_POINT_NUM_FEATURES)
x = self.conv_bn(x, self.MODEL_POINT_FILTERS_CONV_1)
x = self.conv_bn(x, self.MODEL_POINT_FILTERS_CONV_1)
x = self.tnet(x, self.MODEL_POINT_FILTERS_CONV_1)
x = self.conv_bn(x, self.MODEL_POINT_FILTERS_CONV_1)
x = self.conv_bn(x, self.MODEL_POINT_FILTERS_CONV_2)
x = self.conv_bn(x, self.MODEL_POINT_FILTERS_CONV_3)
x = layers.GlobalMaxPooling1D()(x)
x = self.dense_bn(x, self.MODEL_POINT_UNITS_DENSE_1)
x = layers.Dropout(self.MODEL_POINT_DROPOUT_RATE)(x)
x = self.dense_bn(x, self.MODEL_POINT_UNITS_DENSE_2)
x = layers.Dropout(self.MODEL_POINT_DROPOUT_RATE)(x)
model = keras.Model(inputs, outputs=x, name=self.MODEL_POINT_NAME)
return model
def _build_final_layers_block(self, x):
units = self.MODEL_MERGED_UNITS_DENSE_INI
for block in range(1, self.MODEL_MERGED_NUM_BLOCKS + 1):
x = layers.Dense(
units=units,
activation=self.MODEL_MERGED_ACTIVATION_IN,
)(x)
x = layers.Dropout(self.MODEL_MERGED_DROPOUT_RATE)(x)
units = int(units * self.MODEL_MERGED_UNITS_FACTOR)
outputs = layers.Dense(
units=self.MODEL_MERGED_UNITS_OUTPUT,
activation=self.MODEL_MERGED_ACTIVATION_OUT,
)(x)
return outputs
def build_model_HybridPointNetMetav1(self):
model_1 = self.build_pointnet_model()
model_2 = self._build_dense_meta_model()
input_1 = model_1.output
input_2 = model_2.output
x = layers.concatenate([input_1, input_2], name="concated_layers")
outputs = self._build_final_layers_block(x)
model = keras.Model(
(model_1.inputs, model_2.inputs), outputs, name=self.MODEL_MERGED_NAME
)
return model
def build_model(self):
pass
if self.ARCHITECTURE == "HybridPointNetMetav1":
model = self.build_model_HybridPointNetMetav1()
elif self.ARCHITECTURE == "PointNetv1":
pass
return model```
**My code block for the data ingestion
**
class TFDatasetGetter:
“””ABC”””
def __init__(self, cfg: DictConfig) -> None:
self.BATCHSIZE = cfg["ml_model"]["BATCHSIZE"]
self.ARCHITECTURE = cfg["ml_model"]["ARCHITECTURE"]
self.SCALE_DIVIDER_POINTS = cfg["ml_function"]["SCALE_DIVIDER_POINTS"]
self.RAND_SEED = cfg["ml_function"]["RAND_SEED"]
# initialize dataprocessor
self.pc_processing = data.operations3d.PointCloudProcessing(cfg)
self.pc_augmentation = data.operations3d.PointCloudAugmentation(cfg)
######################################################################
# define methods
def merge_col_names(self, prefix):
col_names = [prefix + str(x) for x in range(self.NUM_POINTS_TARGETS)]
return col_names
def _load_pointcloud(self, filepath):
filepath = filepath.numpy().decode("utf-8")
mesh = self.pc_processing.load_stl_as_mesh_o3d(filepath)
pointcloud = self.pc_processing.create_point_cloud_o3d(mesh)
### scale pointcloud data, because loading in pipe and not available in default preprocessing
array_pointcloud = np.asarray(pointcloud.points) / self.SCALE_DIVIDER_POINTS
print(array_pointcloud.shape)
return array_pointcloud
def _load_pointcloud_random(self, filepath):
seed = round(time.time())
o3d.utility.random.seed(seed)
return self._load_pointcloud(filepath)
def _load_pointcloud_not_random(self, filepath):
o3d.utility.random.seed(
self.RAND_SEED
) # for getting reproducible results for test and val data
return self._load_pointcloud(filepath)
def _get_preprocessed_ds_pointclouds(self, df_3d, df_meta, df_targets, which_set):
### create tf datasets 3d, meta, targets for pipe
ds_3d = tf.data.Dataset.from_tensor_slices(df_3d, name=f"ds_{which_set}_3d")
if which_set == "train":
### define lambda functions for loading and augmenting random pointclouds
lambda_load_pointcloud = lambda filepath: tf.py_function(
self._load_pointcloud_random, [filepath[0]], Tout=tf.float32
)
lambda_augment_pointcloud = lambda array: tf.py_function(
self.pc_augmentation.augment_pointcloud, [array], Tout=tf.float32
)
ds_3d = ds_3d.map(lambda_load_pointcloud)
ds_3d = ds_3d.map(lambda_augment_pointcloud)
if which_set == "test" or which_set == "val":
### define lambda functions for loading not random pointclouds
lambda_load_pointcloud = lambda filepath: tf.py_function(
self._load_pointcloud_not_random, [filepath[0]], Tout=tf.float32
)
ds_3d = ds_3d.map(lambda_load_pointcloud)
ds_meta = tf.data.Dataset.from_tensor_slices(
df_meta, name=f"ds_{which_set}_meta"
)
ds_targets = tf.data.Dataset.from_tensor_slices(
df_targets, name=f"ds_{which_set}_targets"
)
return ds_3d, ds_meta, ds_targets
def _get_postprocessed_ds(self, ds, which_set):
if which_set == "train":
ds = (
ds.shuffle(buffer_size=1000, seed=self.RAND_SEED)
.batch(self.BATCHSIZE)
.prefetch(tf.data.AUTOTUNE)
)
if which_set == "test" or which_set == "val":
ds = ds.batch(self.BATCHSIZE).prefetch(tf.data.AUTOTUNE)
return ds
def get_tf_dataset(self, df_3d, df_meta, df_y, which_set) -> tf.data.Dataset:
### get tf train, val, test datasets
ds_3d, ds_meta, ds_y = self._get_preprocessed_ds_pointclouds(
df_3d, df_meta, df_y, which_set
)
### zip 3d, meta, targets datasets in each train, val, test tf dataset
if self.ARCHITECTURE == "HybridPointNetMetav1":
ds = tf.data.Dataset.zip(((ds_3d, ds_meta), ds_y))
elif self.ARCHITECTURE == "MetaNetv1":
ds = tf.data.Dataset.zip(
((ds_meta), ds_y)
) # check if correct format or only ds_meta, ds_y without list
ds = self._get_postprocessed_ds(ds, which_set)
return ds```
**What have I already tried?
**
- Tested according to the exclusion procedure. I have implemented and tested the tutorial at https://keras.io/examples/vision/pointnet/.
- An error message is displayed if the format of the input layer is not the same as the format of the respective Pointcloud.
- I have tested whether this is due to the structure of my implementation of the PointNet. I have fed my PointNet with the data from the https://keras.io/examples/vision/pointnet/ example – an error occurs here if the formats of the input layer and the point cloud do not match (this is an expected result).
- I have tested whether this is due to the data pipe. To do this, I took my implementation of the data test and fed it into the PointNet from the https://keras.io/examples/vision/pointnet/ example. No error message occurs here, even if the format of the input layer does not correspond to that of the Pointcloud (this is not expected behavior).
- The error must therefore be in the data pipe (tf.data.dataset)
So the question is: Is this an expected behavior not to get an error, if the format of the point cloud does not match with the format of the input layer of the PointNet? I cannot find any failure within the code…
Thanks for help.