/ main.py
main.py
1 import cv2 2 import numpy as np 3 4 5 # 灰度转换 6 def convert_to_grayscale(image): 7 print("灰度转换已完成 10%") 8 return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) 9 10 11 def gaussian_kernel(size, sigma=1.0): 12 size = int(size) // 2 13 x, y = np.mgrid[-size : size + 1, -size : size + 1] 14 normal = 1 / (2.0 * np.pi * sigma**2) 15 g = np.exp(-((x**2 + y**2) / (2.0 * sigma**2))) * normal 16 return g 17 18 19 # 滤波 20 def apply_gaussian_blur(image, kernel_size=5, sigma=1.0): 21 print("高斯滤波已完成 20%") 22 kernel = gaussian_kernel(kernel_size, sigma) 23 output = np.zeros_like(image) 24 25 # 添加边界填充,以便卷积时边缘像素也能正确处理 26 pad_height = kernel_size // 2 27 pad_width = kernel_size // 2 28 padded_image = np.pad( 29 image, 30 [(pad_height, pad_height), (pad_width, pad_width)], 31 mode="constant", 32 constant_values=0, 33 ) 34 35 # 对图像的每个像素应用高斯核 36 for i in range(image.shape[0]): 37 for j in range(image.shape[1]): 38 region = padded_image[i : i + kernel_size, j : j + kernel_size] 39 output[i, j] = np.sum(region * kernel) 40 41 return output 42 43 44 # 边缘检测 45 def sobel_filters(image): 46 Kx = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]]) 47 Ky = np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]]) 48 Ix = np.convolve(image.flatten(), Kx.flatten(), "same").reshape(image.shape) 49 Iy = np.convolve(image.flatten(), Ky.flatten(), "same").reshape(image.shape) 50 G = np.sqrt(Ix**2 + Iy**2) 51 Theta = np.arctan2(Iy, Ix) 52 return G, Theta 53 54 55 def non_max_suppression(gradient, direction): 56 M, N = gradient.shape 57 Z = np.zeros((M, N)) 58 angle = direction * 180.0 / np.pi 59 angle[angle < 0] += 180 60 for i in range(1, M - 1): 61 for j in range(1, N - 1): 62 try: 63 q = 255 64 r = 255 65 if (0 <= angle[i, j] < 45) or (135 <= angle[i, j] <= 180): 66 q = gradient[i, j + 1] 67 r = gradient[i, j - 1] 68 elif 45 <= angle[i, j] < 135: 69 q = gradient[i + 1, j] 70 r = gradient[i - 1, j] 71 72 if gradient[i, j] >= q and gradient[i, j] >= r: 73 Z[i, j] = gradient[i, j] 74 else: 75 Z[i, j] = 0 76 except IndexError: 77 pass 78 return Z 79 80 81 def threshold(image, low, high): 82 strong = 255 83 weak = 25 84 result = np.zeros_like(image) 85 strong_i, strong_j = np.where(image >= high) 86 weak_i, weak_j = np.where((image >= low) & (image < high)) 87 result[strong_i, strong_j] = strong 88 result[weak_i, weak_j] = weak 89 return result 90 91 92 def detect_edges(image, low_threshold=50, high_threshold=150): 93 print("边缘检测已完成 30%") 94 gradient, direction = sobel_filters(image) 95 non_max_img = non_max_suppression(gradient, direction) 96 final_img = threshold(non_max_img, low_threshold, high_threshold) 97 final_img = np.clip(final_img, 0, 255).astype(np.uint8) 98 return final_img 99 100 101 # 提取感兴趣区域ROI:一开始尝试定义为图像下半部分的一个三角形区域。 102 # 改进后,ROI被定义成一个更符合实际车道形状的梯形区域。 103 def region_of_interest(image): 104 height = image.shape[0] 105 width = image.shape[1] 106 polygons = np.array( 107 [ 108 [ 109 (int(width * 0.1), height), 110 (int(width * 0.9), height), 111 (int(width * 0.55), int(height * 0.6)), 112 (int(width * 0.45), int(height * 0.6)), 113 ] 114 ] 115 ) 116 mask = np.zeros_like(image) 117 cv2.fillPoly(mask, polygons, 255) 118 masked_image = cv2.bitwise_and(image, mask) 119 print("提取感兴趣区域已完成 40%") 120 return masked_image 121 122 123 # 霍夫变换检测直线:通过降低 minLineLength (从100降到20)和增加 maxLineGap (从50增加到300)等, 124 def hough_transform(image, theta_res=1, rho_res=1, threshold=20): 125 height, width = image.shape 126 max_dist = int(np.hypot(height, width)) 127 rhos = np.arange(-max_dist, max_dist, rho_res) 128 thetas = np.radians(np.arange(-90, 90, theta_res)) 129 130 cos_t = np.cos(thetas) 131 sin_t = np.sin(thetas) 132 num_thetas = len(thetas) 133 134 accumulator = np.zeros((2 * max_dist, num_thetas), dtype=np.int32) 135 y_idxs, x_idxs = np.nonzero(image) 136 137 for i in range(len(x_idxs)): 138 x = x_idxs[i] 139 y = y_idxs[i] 140 for t_idx in range(num_thetas): 141 rho = int(round(x * cos_t[t_idx] + y * sin_t[t_idx]) + max_dist) 142 accumulator[rho, t_idx] += 1 143 144 lines = [] 145 for r_idx in range(accumulator.shape[0]): 146 for t_idx in range(accumulator.shape[1]): 147 if accumulator[r_idx, t_idx] > threshold: 148 rho = rhos[r_idx] 149 theta = thetas[t_idx] 150 a = np.cos(theta) 151 b = np.sin(theta) 152 x0 = a * rho 153 y0 = b * rho 154 x1 = int(x0 + 1000 * (-b)) 155 y1 = int(y0 + 1000 * (a)) 156 x2 = int(x0 - 1000 * (-b)) 157 y2 = int(y0 - 1000 * (a)) 158 lines.append([x1, y1, x2, y2]) 159 160 return np.array(lines).reshape(-1, 1, 4) 161 162 163 # 线段后处理:对检测到的线段进行后处理,计算左右车道线的平均斜率和截距; 164 # 并对检测到的线段进行合并,减少了由于噪声和断裂导致的检测不连续性,使检测结果更加稳定和连续。 165 def average_slope_intercept(image, lines): 166 left_lines = [] # (slope, intercept) 167 right_lines = [] # (slope, intercept) 168 left_weights = [] # length of the line segment 169 right_weights = [] # length of the line segment 170 171 for line in lines: 172 for x1, y1, x2, y2 in line: 173 if x1 == x2: # 两个端点的 x 坐标相同,表示这是一条垂直线。跳过 174 continue # skip vertical lines 175 slope = (y2 - y1) / (x2 - x1) # 斜率 176 intercept = y1 - slope * x1 # 截距 177 length = np.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2) # 长度 178 if slope < 0: # left lane 左 179 left_lines.append((slope, intercept)) 180 left_weights.append(length) 181 else: # right lane 右 182 right_lines.append((slope, intercept)) 183 right_weights.append(length) 184 185 # 加权平均的方法来确定每侧车道的代表性直线 186 # 每条线的 (slope, intercept) 与其长度作为权重相乘,然后除以所有权重的和 187 left_lane = ( 188 np.dot(left_weights, left_lines) / np.sum(left_weights) 189 if len(left_weights) > 0 190 else None 191 ) 192 right_lane = ( 193 np.dot(right_weights, right_lines) / np.sum(right_weights) 194 if len(right_weights) > 0 195 else None 196 ) 197 print("线段后处理已完成 80%") 198 return left_lane, right_lane 199 200 201 def make_line_points(y1, y2, line): 202 if line is None: 203 return None 204 slope, intercept = line 205 x1 = int((y1 - intercept) / slope) 206 x2 = int((y2 - intercept) / slope) 207 y1 = int(y1) 208 y2 = int(y2) 209 return ((x1, y1), (x2, y2)) 210 211 212 def draw_lines(image, lines): 213 line_image = np.zeros_like(image) 214 if lines is not None: 215 left_lane, right_lane = average_slope_intercept(image, lines) 216 y1 = image.shape[0] 217 y2 = y1 * 0.6 218 left_line = make_line_points(y1, y2, left_lane) 219 right_line = make_line_points(y1, y2, right_lane) 220 for line in [left_line, right_line]: 221 if line is not None: 222 cv2.line(line_image, *line, (0, 255, 0), 10) # 绿色线条 223 print("车道线绘制已完成 90%") 224 return line_image 225 226 227 # 处理单帧图像 228 def process_image(image): 229 gray = convert_to_grayscale(image) 230 blurred = apply_gaussian_blur(gray) 231 edges = detect_edges(blurred) 232 roi = region_of_interest(edges) 233 lines = hough_transform(roi) 234 line_image = draw_lines(image, lines) 235 combined = cv2.addWeighted(image, 0.8, line_image, 1, 1) 236 print("单帧图像处理已完成 100%") 237 return combined 238 239 240 # 处理视频 241 def process_video(input_video_path, output_video_path): 242 cap = cv2.VideoCapture(input_video_path) 243 fourcc = cv2.VideoWriter_fourcc(*"mp4v") 244 out = cv2.VideoWriter( 245 output_video_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))) 246 ) 247 248 while cap.isOpened(): 249 ret, frame = cap.read() 250 if not ret: 251 break 252 processed_frame = process_image(frame) 253 out.write(processed_frame) 254 print( 255 "视频处理进度: {:.2f}%".format( 256 (cap.get(cv2.CAP_PROP_POS_FRAMES) / cap.get(cv2.CAP_PROP_FRAME_COUNT)) 257 * 100 258 ) 259 ) 260 261 cap.release() 262 out.release() 263 # cv2.destroyAllWindows() 264 print("视频处理已完成 100%") 265 266 267 # 处理视频 268 process_video("drive.mp4", "output_drive_V3.mp4")