307 lines
8.8 KiB
Python
307 lines
8.8 KiB
Python
import sys
|
|
import os
|
|
import shutil
|
|
import math
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../') # add project root directory
|
|
|
|
from dataset.shapenet import ShapeNetPartSegDataset
|
|
from model.pointnet2_part_seg import PointNet2PartSegmentNet
|
|
import torch_geometric.transforms as GT
|
|
import torch
|
|
import argparse
|
|
from distutils.util import strtobool
|
|
|
|
import numpy as np
|
|
from sklearn.cluster import DBSCAN
|
|
from sklearn.preprocessing import StandardScaler
|
|
import open3d as o3d
|
|
import pointcloud as pc
|
|
|
|
def eval_sample(net, sample):
|
|
'''
|
|
sample: { 'points': tensor(n, 3), 'labels': tensor(n,) }
|
|
return: (pred_label, gt_label) with labels shape (n,)
|
|
'''
|
|
net.eval()
|
|
with torch.no_grad():
|
|
# points: (n, 3)
|
|
points, gt_label = sample['points'], sample['labels']
|
|
n = points.shape[0]
|
|
|
|
points = points.view(1, n, 3) # make a batch
|
|
points = points.transpose(1, 2).contiguous()
|
|
points = points.to(device, dtype)
|
|
|
|
pred = net(points) # (batch_size, n, num_classes)
|
|
pred_label = pred.max(2)[1]
|
|
pred_label = pred_label.view(-1).cpu() # (n,)
|
|
|
|
assert pred_label.shape == gt_label.shape
|
|
return (pred_label, gt_label)
|
|
|
|
|
|
def mini_color_table(index, norm=True):
|
|
colors = [
|
|
[0.5000, 0.5400, 0.5300], [0.8900, 0.1500, 0.2100], [0.6400, 0.5800, 0.5000],
|
|
[1.0000, 0.3800, 0.0100], [1.0000, 0.6600, 0.1400], [0.4980, 1.0000, 0.0000],
|
|
[0.4980, 1.0000, 0.8314], [0.9412, 0.9725, 1.0000], [0.5412, 0.1686, 0.8863],
|
|
[0.5765, 0.4392, 0.8588], [0.3600, 0.1400, 0.4300], [0.5600, 0.3700, 0.6000],
|
|
]
|
|
|
|
color = colors[index % len(colors)]
|
|
|
|
if not norm:
|
|
color[0] *= 255
|
|
color[1] *= 255
|
|
color[2] *= 255
|
|
|
|
return color
|
|
|
|
|
|
def label2color(labels):
|
|
'''
|
|
labels: np.ndarray with shape (n, )
|
|
colors(return): np.ndarray with shape (n, 3)
|
|
'''
|
|
num = labels.shape[0]
|
|
colors = np.zeros((num, 3))
|
|
|
|
minl, maxl = np.min(labels), np.max(labels)
|
|
for l in range(minl, maxl + 1):
|
|
colors[labels == l, :] = mini_color_table(l)
|
|
|
|
return colors
|
|
|
|
|
|
def clusterToColor(cluster, cluster_idx):
|
|
|
|
colors = np.zeros(shape=(len(cluster), 3))
|
|
point_idx = 0
|
|
for point in cluster:
|
|
colors[point_idx, :] = mini_color_table(cluster_idx)
|
|
point_idx += 1
|
|
|
|
return colors
|
|
|
|
|
|
|
|
def farthest_point_sampling(pts, K):
|
|
|
|
if pts.shape[0] < K:
|
|
return pts
|
|
|
|
def calc_distances(p0, points):
|
|
return ((p0[:3] - points[:, :3]) ** 2).sum(axis=1)
|
|
|
|
farthest_pts = np.zeros((K, pts.shape[1]))
|
|
farthest_pts[0] = pts[np.random.randint(len(pts))]
|
|
distances = calc_distances(farthest_pts[0], pts)
|
|
for i in range(1, K):
|
|
farthest_pts[i] = pts[np.argmax(distances)]
|
|
distances = np.minimum(distances, calc_distances(farthest_pts[i], pts))
|
|
|
|
return farthest_pts
|
|
|
|
|
|
def append_onehotencoded_type(data, factor = 1.0):
|
|
|
|
types = data[:, 6].astype(int)
|
|
res = np.zeros((len(types), 4))
|
|
res[np.arange(len(types)), types] = factor
|
|
|
|
return np.column_stack((data, res))
|
|
|
|
|
|
def append_normal_angles(data):
|
|
|
|
def func(x):
|
|
theta = math.acos(x[2]) / math.pi
|
|
phi = (math.atan2(x[1], x[0]) + math.pi) / (2.0 * math.pi)
|
|
return (theta, phi)
|
|
|
|
res = np.array([func(xi) for xi in data[:, 3:6]])
|
|
|
|
print(res)
|
|
|
|
return np.column_stack((data, res))
|
|
|
|
|
|
def extract_clusters(data, selected_indices, eps, min_samples, metric='euclidean', algo='auto'):
|
|
|
|
min_samples = min_samples * len(data)
|
|
|
|
print('Clustering. Min Samples: ' + str(min_samples) + ' EPS: ' + str(eps))
|
|
|
|
# 0,1,2 : pos
|
|
# 3,4,5 : normal
|
|
# 6: type index
|
|
# 7,8,9,10: type index one hot encoded
|
|
# 11,12: normal as angles
|
|
|
|
db_res = DBSCAN(eps=eps, metric=metric, n_jobs=-1, algorithm=algo, min_samples=min_samples).fit(data[:, selected_indices])
|
|
|
|
labels = db_res.labels_
|
|
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
|
|
n_noise = list(labels).count(-1)
|
|
print("Noise: " + str(n_noise) + " Clusters: " + str(n_clusters))
|
|
|
|
clusters = {}
|
|
for idx, l in enumerate(labels):
|
|
if l is -1:
|
|
continue
|
|
clusters.setdefault(str(l), []).append(data[idx, :])
|
|
|
|
npClusters = []
|
|
for cluster in clusters.values():
|
|
npClusters.append(np.array(cluster))
|
|
|
|
return npClusters
|
|
|
|
|
|
def draw_clusters(clusters):
|
|
|
|
clouds = []
|
|
|
|
cluster_idx = 0
|
|
for cluster in clusters:
|
|
|
|
cloud = o3d.PointCloud()
|
|
cloud.points = o3d.Vector3dVector(cluster[:,:3])
|
|
cloud.colors = o3d.Vector3dVector(clusterToColor(cluster, cluster_idx))
|
|
clouds.append(cloud)
|
|
cluster_idx += 1
|
|
|
|
o3d.draw_geometries(clouds)
|
|
|
|
|
|
def draw_sample_data(sample_data, colored_normals = False):
|
|
|
|
cloud = o3d.PointCloud()
|
|
cloud.points = o3d.Vector3dVector(sample_data[:,:3])
|
|
cloud.colors = \
|
|
o3d.Vector3dVector(label2color(sample_data[:, 6].astype(int)) if not colored_normals else sample_data[:, 3:6])
|
|
|
|
o3d.draw_geometries([cloud])
|
|
|
|
|
|
def recreate_folder(folder):
|
|
if os.path.exists(folder) and os.path.isdir(folder):
|
|
shutil.rmtree(folder)
|
|
os.mkdir(folder)
|
|
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../') # add project root directory
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--npoints', type=int, default=2048, help='resample points number')
|
|
parser.add_argument('--model', type=str, default='./checkpoint/seg_model_custom_0.pth', help='model path')
|
|
parser.add_argument('--sample_idx', type=int, default=0, help='select a sample to segment and view result')
|
|
parser.add_argument('--headers', type=strtobool, default=True, help='if raw files come with headers')
|
|
parser.add_argument('--collate_per_segment', type=strtobool, default=True, help='whether to look at pointclouds or sub')
|
|
parser.add_argument('--has_variations', type=strtobool, default=False,
|
|
help='whether a single pointcloud has variations '
|
|
'named int(id)_pc.(xyz|dat) look at pointclouds or sub')
|
|
|
|
opt = parser.parse_args()
|
|
print(opt)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
# Create dataset
|
|
print('Create data set ..')
|
|
|
|
dataset_folder = './data/raw/predict/'
|
|
pointcloud_file = './pointclouds/0_0.xyz'
|
|
|
|
# Load and pre-process point cloud
|
|
pcloud = pc.read_pointcloud(pointcloud_file)
|
|
pcloud = pc.normalize_pointcloud(pcloud, 1)
|
|
|
|
#a, b = pc.split_outliers(pcloud, [3, 4, 5])
|
|
#draw_sample_data(a, True)
|
|
#draw_sample_data(b, True)
|
|
#pcloud = a
|
|
|
|
|
|
# for 0_0.xyz: pc.hierarchical_clustering(pcloud, [0, 1, 2, 3, 4, 5], eps=0.1, min_samples=5)
|
|
|
|
|
|
# pc_clusters = pc.hierarchical_clustering(pcloud, [0, 1, 2, 3, 4, 5], eps=0.1, min_samples=5)
|
|
# pc_clusters = pc.filter_clusters(pc_clusters, 100)
|
|
|
|
pc_clusters = [pcloud]
|
|
|
|
print("NUM CLUSTERS: ", len(pc_clusters))
|
|
|
|
draw_clusters(pc_clusters)
|
|
for c in pc_clusters:
|
|
draw_sample_data(c, True)
|
|
print("Cluster Size: ", len(c))
|
|
|
|
|
|
|
|
# draw_sample_data(pcloud)
|
|
|
|
pc_clusters = pc.cluster_cubes(pcloud, [1, 1, 1])
|
|
|
|
recreate_folder(dataset_folder)
|
|
for idx, pcc in enumerate(pc_clusters):
|
|
|
|
pcc = farthest_point_sampling(pcc, opt.npoints)
|
|
recreate_folder(dataset_folder + str(idx) + '/')
|
|
pc.write_pointcloud(dataset_folder + str(idx) + '/pc.xyz', pcc)
|
|
# draw_sample_data(pcc, False)
|
|
|
|
# Load dataset
|
|
print('load dataset ..')
|
|
test_transform = GT.Compose([GT.NormalizeScale(), ])
|
|
|
|
test_dataset = ShapeNetPartSegDataset(
|
|
mode='predict',
|
|
root_dir='data',
|
|
npoints=opt.npoints,
|
|
refresh=False,
|
|
collate_per_segment=opt.collate_per_segment,
|
|
has_variations=opt.has_variations,
|
|
headers=opt.headers
|
|
)
|
|
|
|
num_classes = test_dataset.num_classes()
|
|
|
|
# Load model
|
|
print('Construct model ..')
|
|
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
|
|
dtype = torch.float
|
|
|
|
net = PointNet2PartSegmentNet(num_classes)
|
|
|
|
net.load_state_dict(torch.load(opt.model, map_location=device.type))
|
|
net = net.to(device, dtype)
|
|
net.eval()
|
|
|
|
labeled_dataset = None
|
|
|
|
# Iterate over all the samples and predict
|
|
for sample in test_dataset:
|
|
|
|
# Predict
|
|
|
|
pred_label, gt_label = eval_sample(net, sample)
|
|
sample_data = np.column_stack((sample["points"].numpy(), sample["normals"].numpy(), pred_label.numpy()))
|
|
|
|
if labeled_dataset is None:
|
|
labeled_dataset = sample_data
|
|
else:
|
|
labeled_dataset = np.vstack((labeled_dataset, sample_data))
|
|
|
|
print("prediction done")
|
|
|
|
draw_sample_data(labeled_dataset, False)
|
|
|
|
print("point cloud size: ", labeled_dataset.shape)
|
|
|
|
print("Min: ", np.min(labeled_dataset[:, :3]))
|
|
print("Max: ", np.max(labeled_dataset[:, :3]))
|
|
print("Min: ", np.min(pcloud[:, :3]))
|
|
print("Max: ", np.max(pcloud[:, :3]))
|