Markus Friedrich e4cc447f68 stuff
2019-08-09 17:28:04 +02:00

241 lines
6.8 KiB
Python

import numpy as np
import open3d as o3d
from sklearn.cluster import DBSCAN
from pyod.models.knn import KNN
from pyod.models.sod import SOD
from pyod.models.abod import ABOD
from pyod.models.sos import SOS
from pyod.models.pca import PCA
from pyod.models.ocsvm import OCSVM
from pyod.models.mcd import MCD
from pyod.models.lof import LOF
from pyod.models.cof import COF
from pyod.models.cblof import CBLOF
from pyod.models.loci import LOCI
from pyod.models.hbos import HBOS
from pyod.models.lscp import LSCP
from pyod.models.feature_bagging import FeatureBagging
def mini_color_table(index, norm=True):
colors = [
[0.5000, 0.5400, 0.5300], [0.8900, 0.1500, 0.2100], [0.6400, 0.5800, 0.5000],
[1.0000, 0.3800, 0.0100], [1.0000, 0.6600, 0.1400], [0.4980, 1.0000, 0.0000],
[0.4980, 1.0000, 0.8314], [0.9412, 0.9725, 1.0000], [0.5412, 0.1686, 0.8863],
[0.5765, 0.4392, 0.8588], [0.3600, 0.1400, 0.4300], [0.5600, 0.3700, 0.6000],
]
color = colors[index % len(colors)]
if not norm:
color[0] *= 255
color[1] *= 255
color[2] *= 255
return color
def clusterToColor(cluster, cluster_idx):
colors = np.zeros(shape=(len(cluster), 3))
point_idx = 0
for point in cluster:
colors[point_idx, :] = mini_color_table(cluster_idx)
point_idx += 1
return colors
def read_pointcloud(path, delimiter=' ', hasHeader=True):
with open(path, 'r') as f:
if hasHeader:
# Get rid of the Header
_ = f.readline()
# This itrates over all lines, splits them anc converts values to floats. Will fail on wrong values.
pc = [[float(x) for x in line.rstrip().split(delimiter)] for line in f if line != '']
return np.asarray(pc)
def write_pointcloud(file, pc, numCols=6):
np.savetxt(file, pc[:,:numCols], header=str(len(pc)) + ' ' + str(numCols), comments='')
def farthest_point_sampling(pts, K):
if pts.shape[0] < K:
return pts
def calc_distances(p0, points):
return ((p0[:3] - points[:, :3]) ** 2).sum(axis=1)
farthest_pts = np.zeros((K, pts.shape[1]))
farthest_pts[0] = pts[np.random.randint(len(pts))]
distances = calc_distances(farthest_pts[0], pts)
for i in range(1, K):
farthest_pts[i] = pts[np.argmax(distances)]
distances = np.minimum(distances, calc_distances(farthest_pts[i], pts))
return farthest_pts
def cluster_per_column(pc, column):
clusters = []
for i in range(0, int(np.max(pc[:, column]))):
cluster_pc = pc[pc[:, column] == i, :]
clusters.append(cluster_pc)
return clusters
def cluster_cubes(data, cluster_dims, max_points_per_cluster=-1, min_points_per_cluster=-1):
if cluster_dims[0] is 1 and cluster_dims[1] is 1 and cluster_dims[2] is 1:
print("no need to cluster.")
return [data]
max = data[:,:3].max(axis=0)
max += max * 0.01
min = data[:,:3].min(axis=0)
min -= min * 0.01
size = (max - min)
clusters = {}
cluster_size = size / np.array(cluster_dims, dtype=np.float32)
print('Min: ' + str(min) + ' Max: ' + str(max))
print('Cluster Size: ' + str(cluster_size))
for row in data:
# print('Row: ' + str(row))
cluster_pos = ((row[:3] - min) / cluster_size).astype(int)
cluster_idx = cluster_dims[0] * cluster_dims[2] * cluster_pos[1] + cluster_dims[0] * cluster_pos[2] + cluster_pos[0]
clusters.setdefault(cluster_idx, []).append(row)
# Apply farthest point sampling to each cluster
final_clusters = []
for key, cluster in clusters.items():
c = np.vstack(cluster)
if c.shape[0] < min_points_per_cluster and -1 is not min_points_per_cluster:
continue
if max_points_per_cluster is not -1:
final_clusters.append(farthest_point_sampling(c, max_points_per_cluster))
else:
final_clusters.append(c)
return final_clusters
def cluster_dbscan(data, selected_indices, eps, min_samples=5, metric='euclidean', algo='auto'):
# print('Clustering. Min Samples: ' + str(min_samples) + ' EPS: ' + str(eps) + "Selected Indices: " + str(selected_indices))
# 0,1,2 : pos
# 3,4,5 : normal
# 6: type index
# 7,8,9,10: type index one hot encoded
# 11,12: normal as angles
db_res = DBSCAN(eps=eps, metric=metric, n_jobs=-1, min_samples=min_samples, algorithm=algo).fit(data[:, selected_indices])
labels = db_res.labels_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
# print("Noise: " + str(n_noise) + " Clusters: " + str(n_clusters))
clusters = {}
for idx, l in enumerate(labels):
if l is -1:
continue
clusters.setdefault(str(l), []).append(data[idx, :])
npClusters = []
for cluster in clusters.values():
npClusters.append(np.array(cluster))
return npClusters
def draw_clusters(clusters):
clouds = []
for cluster_idx, cluster in enumerate(clusters):
cloud = o3d.PointCloud()
cloud.points = o3d.Vector3dVector(cluster[:,:3])
cloud.colors = o3d.Vector3dVector(clusterToColor(cluster, cluster_idx))
clouds.append(cloud)
o3d.draw_geometries(clouds)
def write_clusters(path, clusters, type_column=6):
file = open(path, "w")
file.write(str(len(clusters)) + "\n")
for cluster in clusters:
print("Types: ", cluster[:, type_column])
types = np.unique(cluster[:, type_column], axis=0)
np.savetxt(file, types, header='', comments='')
np.savetxt(file, cluster[:, :6], header=str(len(cluster)) + ' ' + str(6), comments='')
def normalize_pointcloud(pc, factor=1.0):
max = pc.max(axis=0)
min = pc.min(axis=0)
f = np.max([abs(max[0] - min[0]), abs(max[1] - min[1]), abs(max[2] - min[2])])
pc[:, 0:3] /= (f * factor)
pc[:, 3:6] /= (np.linalg.norm(pc[:, 3:6], ord=2, axis=1, keepdims=True))
return pc
def hierarchical_clustering(data, selected_indices_0, selected_indices_1, eps, min_samples=5, metric='euclidean', algo='auto'):
total_clusters = []
clusters = cluster_dbscan(data, selected_indices_0, eps, min_samples, metric=metric, algo=algo)
for cluster in clusters:
# cluster = normalize_pointcloud(cluster)
sub_clusters = cluster_dbscan(cluster, selected_indices_1, eps, min_samples, metric=metric, algo=algo)
total_clusters.extend(sub_clusters)
return total_clusters
def filter_clusters(clusters, min_size):
filtered_clusters = []
for c in clusters:
if len(c) >= min_size:
filtered_clusters.append(c)
return filtered_clusters
def split_outliers(pc, columns):
clf = KNN()#FeatureBagging() # detector_list=[LOF(), KNN()]
clf.fit(pc[:, columns])
# LOF, kNN
return pc[clf.labels_ == 0], pc[clf.labels_ == 1]