Cpp性能测试

实验一

  • 先确定测量指标
  • 从实际运行搜集输入
  • 不要对性能进行假设
  • 使用微基准测试深入理解代码
  • 部分有益的优化,可能对程序整体有害
  • 高性能程序的开发与优化,不是线性过程,可能需要在高级概览和低级细节中反复迭代测试
  • 在性能方面,没有什么是显而易见的
  • 看似不必要的代码,删除之后,可能会导致性能下降,请搞清楚如何有效利用 CPU 以获得性能提升

Pre-requirements

1
2
3
4
5
6
7
8
sudo pacman -S gperftools

git clone https://github.com/google/benchmark.git --depth=1

cd benchmark
cmake -E make_directory "build"
cmake -E chdir "build" cmake -DBENCHMARK_DOWNLOAD_DEPENDENCIES=on -DCMAKE_BUILD_TYPE=Release ../
cmake --build "build" --config Release
1
2
// code
git clone git@github.com:PacktPublishing/The-Art-of-Writing-Efficient-Programs.git

gperftools

1
clang++ -g -O3 -mavx2 -Wall -pedantic 02_substring_sort.C -o example && CPUPROFILE=prof.data ./example && pprof --text ./example prof.data
1
clang++ -g -O3 -mavx2 -Wall -pedantic 03_substring_sort.C -o example && CPUPROFILE=prof.data ./example && pprof --text ./example prof.data
1
clang++ -g -O3 -mavx2 -Wall -pedantic 04_substring_sort.C -o example && CPUPROFILE=prof.data ./example && pprof --text ./example prof.data

clang++ 13 在 -O3 下的结果,和 clang++-11 的结果不同。

对 compare 函数的优化,13 实在是太厉害了。

perf

使用硬件性能计数器和基于时间的采样。

1
2
3
4
5
6
7
8
// 可选事件列表
perf list

perf stat ./example

// 性能分析器
perf record ./example
perf report

由于编译器优化,指令重排等,perf 显示的汇编代码和源代码的对应关系并不准确。

Google Performance

1
2
3
4
5
6
// -lprofiler
clang++ -g -O3 -mavx2 -Wall -pedantic -lprofiler 02_substring_sort.C -o example

CPUPROFILE=prof.data CPUPROFILE_FREQUENCY=1000 ./example

pprof ./example prof.data
1
2
// 调用图
pprof --pdf ./example prof.data > prof.pdf

微基准测试

大型程序性能测试:

  • 整体编译时间长
  • 性能关键部分不那么明显
  • 目标上下文发生的概率可能较小,比如特定的网络请求时发生

微基准测试,关注某个函数的测试。通过设置函数的起始条件,轻松调用此代码片段。

另外,微基准测试 严重依赖运行代码时,程序所处的上下文环境。测试时,将目标函数放单独在一个编译单元编译,可以缓解编译器优化,对目标函数的运行结果的影响。

微基准测试,不可不信,不可全信。

编译器优化

C++ 标准中,定义了一个关键概念:可观察行为(observable behavior)。

编译器可以对程序进行任何它想要的更改,只要这些更改的结果不会改变可观察行为的结果

  • volatile 对象的访问,严格按照表达式的语义,不会对其重新排序
  • 程序终止时,写入文件时,严格按照程序写入的方式执行
  • 发送到交互设备的提示文本,将在程序等待输入之前,输出显示。输入和输出操作不能被省略或者重排
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>

using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::system_clock;
using std::cout;
using std::endl;
using std::unique_ptr;

bool compare1(const char* s1, const char* s2) {
int i1 = 0, i2 = 0;
char c1, c2;
while (1) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
++i1; ++i2;
}
}

bool compare2(const char* s1, const char* s2) {
unsigned int i1 = 0, i2 = 0;
char c1, c2;
while (1) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
++i1; ++i2;
}
}

int main() {
constexpr unsigned int N = 1 << 20;
constexpr int NI = 1 << 11;
unique_ptr<char[]> s(new char[2*N]);
::memset(s.get(), 'a', 2*N*sizeof(char));
s[2*N-1] = 0;
system_clock::time_point t0 = system_clock::now();
for (int i = 0; i < NI; ++i) {
compare1(s.get(), s.get() + N);
}
system_clock::time_point t1 = system_clock::now();
for (int i = 0; i < NI; ++i) {
compare2(s.get(), s.get() + N);
}
system_clock::time_point t2 = system_clock::now();
cout << duration_cast<microseconds>(t1 - t0).count() << "us " << duration_cast<microseconds>(t2 - t1).count() << "us" << endl;
}

这段程序,compare1、compare2 不会改变程序中 s 的可观察行为。编译器直接跳过执行。输出时间是 0。

使用 volatile 可以正常执行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <memory>

using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::system_clock;
using std::cout;
using std::endl;
using std::unique_ptr;

bool compare1(const char* s1, const char* s2) {
int i1 = 0, i2 = 0;
char c1, c2;
while (1) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
++i1; ++i2;
}
}

bool compare2(const char* s1, const char* s2) {
unsigned int i1 = 0, i2 = 0;
char c1, c2;
while (1) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
++i1; ++i2;
}
}

int main() {
constexpr unsigned int N = 1 << 20;
constexpr int NI = 1 << 11;
unique_ptr<char[]> s(new char[2*N]);
::memset(s.get(), 'a', 2*N*sizeof(char));
s[2*N-1] = 0;
volatile bool sink; // difference
system_clock::time_point t0 = system_clock::now();
for (int i = 0; i < NI; ++i) {
sink = compare1(s.get(), s.get() + N); // difference
}
system_clock::time_point t1 = system_clock::now();
for (int i = 0; i < NI; ++i) {
sink = compare2(s.get(), s.get() + N); // difference
}
system_clock::time_point t2 = system_clock::now();
cout << duration_cast<microseconds>(t1 - t0).count() << "us " << duration_cast<microseconds>(t2 - t1).count() << "us" << endl;
}

Google benchmark

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/google/benchmark.git --depth=1

cd benchmark
cmake -E make_directory "build"
cmake -E chdir "build" cmake -DBENCHMARK_DOWNLOAD_DEPENDENCIES=on -DCMAKE_BUILD_TYPE=Release ../
cmake --build "build" --config Release

export GBEN_INC=xxx/benchmark/include/
export GBEN_LIB=xxx/benchmark/build/src/

clang++ -g -O3 -mavx2 -Wall -pedantic -I$GBEN_INC -lpthread -lrt -lm 10a_compare_mbm.C $GBEN_LIB/libbenchmark.a -o benchmark
1
./benchmark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <cstdlib>
#include <cstring>
#include <memory>

#include "benchmark/benchmark.h"

using std::unique_ptr;

bool compare_int(const char* s1, const char* s2) {
char c1, c2;
for (int i1 = 0, i2 = 0; ; ++i1, ++i2) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
}
}

bool compare_uint(const char* s1, const char* s2) {
char c1, c2;
for (unsigned int i1 = 0, i2 = 0; ; ++i1, ++i2) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
}
}

bool compare_uint_l(const char* s1, const char* s2, unsigned int l) {
if (s1 == s2) return false;
char c1, c2;
for (unsigned int i1 = 0, i2 = 0; i1 < l; ++i1, ++i2) {
c1 = s1[i1]; c2 = s2[i2];
if (c1 != c2) return c1 > c2;
}
return false;
}

void BM_loop_int(benchmark::State& state) {
const unsigned int N = state.range(0);
unique_ptr<char[]> s(new char[2*N]);
::memset(s.get(), 'a', 2*N*sizeof(char));
s[2*N-1] = 0;
const char* s1 = s.get(), *s2 = s1 + N;
for (auto _ : state) {
benchmark::DoNotOptimize(compare_int(s1, s2));
}
state.SetItemsProcessed(N*state.iterations());
}

void BM_loop_uint(benchmark::State& state) {
const unsigned int N = state.range(0);
unique_ptr<char[]> s(new char[2*N]);
::memset(s.get(), 'a', 2*N*sizeof(char));
s[2*N-1] = 0;
const char* s1 = s.get(), *s2 = s1 + N;
for (auto _ : state) {
benchmark::DoNotOptimize(compare_uint(s1, s2));
}
state.SetItemsProcessed(N*state.iterations());
}

void BM_loop_uint_l(benchmark::State& state) {
const unsigned int N = state.range(0);
unique_ptr<char[]> s(new char[2*N]);
::memset(s.get(), 'a', 2*N*sizeof(char));
s[2*N-1] = 0;
const char* s1 = s.get(), *s2 = s1 + N;
for (auto _ : state) {
benchmark::DoNotOptimize(compare_uint_l(s1, s2, 2*N));
}
state.SetItemsProcessed(N*state.iterations());
}

#define ARGS ->Arg(1<<20)

BENCHMARK(BM_loop_int) ARGS;
BENCHMARK(BM_loop_uint) ARGS;
BENCHMARK(BM_loop_uint_l) ARGS;

BENCHMARK_MAIN();

benchmark::State& state : 入参;state.SetItemsProcessed : 报告代码每秒处理的字符数;->Arg(1<<20) : 向 BENCHMARK 宏传递参数。

benchmark::DoNotOptimize : 类似 volatile sink 的作用,不会将 compare_uint 函数调用优化掉,但是函数内部逻辑还是会进行优化。

实验二 高效利用 CPU 资源

CPU 指令运行优化

  • 对于每次循环,独立地,对寄存器中相同的数据,进行不同的操作,是可以指令级并行的。

  • 数据依赖依靠流水线技术,错位执行。每个循环使用相同的寄存器,对于同一个寄存器不能在一个周期内既读又写。但是这是对于编译器而言,实际上硬件上的寄存器并不一定相同。寄存器重命名,可以使得相同寄存器名称实际在运行时可以对应到不同的硬件寄存器。

  • 流水线技术需要足够多的指令才能发挥出其性能优势。如果出现分支,那么需要分支预测,来尽可能提供更多的指令来运行。

  • 分支预测 + 推测执行,处理器会根据历史运行结果,推测是否需要执行某个分支的指令。如果预测正确,并行化提高了性能;如果预测错误,就需要冲刷流水线,丢弃预测错误的指令。

    perf stat ./benchmark

    可以查看分支预测的情况

  • 推测执行可能会出现问题,比如 *ptr 解引用了 NULL 指针,但是由于 CPU 会在缓冲区进行推测执行,但是 CPU 会假装从未发生。

  • 循环展开也许会对现代编译器下的代码产生有益的影响,但是不是一定的。因为许多向量化编译器,会使用 SSE 或者 AVX 指令来优化循环的执行,一次处理多个元素。所以,手动循环展开,不一定会更高效。

测试运算指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#include <stdlib.h>
#include <string.h>

#include "benchmark/benchmark.h"

void BM_add(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
}
// 不让编译器对 state 次相同的操作,进行优化
// 但是内层循环的运算操作的优化,仍然生效
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
// 强制写出结果到内存,高效的 内存屏障
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_multiply(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a2 += p1[i] * p2[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_divide(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand() + 1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a2 += p1[i] / p2[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_add_multiply(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p1[i] * p2[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_add_multiply2(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<unsigned long> v3(N), v4(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
v3[i] = rand();
v4[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
unsigned long* p3 = v3.data();
unsigned long* p4 = v4.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p3[i] * p4[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_add2_multiply_sub_shift(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long a3 = 0, a4 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p1[i] * p2[i];
a3 += p1[i] << 2;
a4 += p2[i] - p1[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::DoNotOptimize(a3);
benchmark::DoNotOptimize(a4);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_instructions1(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long a3 = 0, a4 = 0;
unsigned long a5 = 0, a6 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p1[i] * p2[i];
a3 += p1[i] << 2;
a4 += p2[i] - p1[i];
a5 += p2[i] << 1;
a6 += (p2[i] - 3);
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::DoNotOptimize(a3);
benchmark::DoNotOptimize(a4);
benchmark::DoNotOptimize(a5);
benchmark::DoNotOptimize(a6);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_instructions2(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long a3 = 0, a4 = 0;
unsigned long a5 = 0, a6 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p1[i] * p2[i];
a3 += p1[i] << 2;
a4 += p2[i] - p1[i];
a5 += (p2[i] << 1)*p2[i];
a6 += (p2[i] - 3)*p1[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::DoNotOptimize(a3);
benchmark::DoNotOptimize(a4);
benchmark::DoNotOptimize(a5);
benchmark::DoNotOptimize(a6);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_add2_multiply_sub_shift4(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<unsigned long> v3(N), v4(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
v3[i] = rand();
v4[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
unsigned long* p3 = v3.data();
unsigned long* p4 = v4.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long a3 = 0, a4 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += p1[i] + p2[i];
a2 += p2[i] * p3[i];
a3 += p4[i] << 2;
a4 += p3[i] - p4[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::DoNotOptimize(a3);
benchmark::DoNotOptimize(a4);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_max(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a2 += (p1[i] > p2[i]) ? p1[i] : p2[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_add_multiply_dep(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
a1 += (p1[i] + p2[i]) * (p1[i] - p2[i]);
//a1 += (p1[i] * p2[i]);
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

#define ARGS \
->Arg(1<<22)

BENCHMARK(BM_add) ARGS;
BENCHMARK(BM_multiply) ARGS;
BENCHMARK(BM_divide) ARGS;
BENCHMARK(BM_add_multiply) ARGS;
BENCHMARK(BM_add_multiply2) ARGS;
BENCHMARK(BM_add2_multiply_sub_shift) ARGS;
BENCHMARK(BM_instructions1) ARGS;
BENCHMARK(BM_instructions2) ARGS;
BENCHMARK(BM_add2_multiply_sub_shift4) ARGS;
BENCHMARK(BM_max) ARGS;
BENCHMARK(BM_add_multiply_dep) ARGS;

BENCHMARK_MAIN();

无分支计算

无分支计算可以消除或者减轻分支预测失败带来的性能损失。

但是,如果消除分支的代价是,增加了非内联的复杂函数的调用次数,那么函数调用的代价会比分支预测失败的代价大。

因为函数调用本身就会中断流水线,这也是内联函数高效地一个原因之一。

1
2
3
// 编译命令
clang++ -g -O3 -mavx2 -Wall -pedantic -I$GBEN_INC -lpthread -lrt -lm 02_branch.C $GBEN_LIB/libbenchmark.a -o benchmar
k
1
2
3
4
5
6
7
#include <stdlib.h>
#include <string.h>
#include <iostream>
#define MCA_START __asm volatile("# LLVM-MCA-BEGIN");
#define MCA_END __asm volatile("# LLVM-MCA-END");

#include "benchmark/benchmark.h"

例一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void BM_branched(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
#if 0
(b1[i] ? a1 : a2) += p1[i];
#else
if (b1[i]) {
a1 += p1[i];
} else {
a2 += p1[i];
}
#endif // 1
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

使用数组,消除分支,通过索引在不同的内存单元进行计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void BM_branchless(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long* a[2] = { &a2, &a1 };
for (size_t i = 0; i < N; ++i) {
a[b1[i]] += p1[i];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

(b1[i] ? a1 : a2) 在某些编译器中,会使用查找数组的方式优化,也就是上面的方式。所以,比使用条件分支代码更高效。

例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void BM_branched1(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
if (b1[i]) {
a1 += p1[i];
} else {
a2 += p2[i];
}
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

使用两个中间值数组,进行分支消除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void BM_branchless1(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
unsigned long s1[2] = { 0, p1[i] };
unsigned long s2[2] = { p2[i], 0 };
a1 += s1[b1[i]];
a2 += s2[b1[i]];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

例三

基于随机值条件分支的版本,处理器不能对其进行高效正确率高的分支预测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void BM_branched2(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
if (b1[i]) {
a1 += p1[i] - p2[i];
} else {
a2 += p1[i] * p2[i];
}
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

基于bool值条件分支的版本,处理器可以进行高效正确率高的分支预测,程序会被处理器优化执行,实测性能与无分支版本相当,甚至更好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void BM_branched2_predicted(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() > 0;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
if (b1[i]) {
a1 += p1[i] - p2[i];
} else {
a2 += p1[i] * p2[i];
}
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

无分支版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void BM_branchless2(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
for (size_t i = 0; i < N; ++i) {
unsigned long s1[2] = { 0, p1[i] - p2[i] };
unsigned long s2[2] = { p1[i] * p2[i], 0 };
a1 += s1[b1[i]];
a2 += s2[b1[i]];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

void BM_branchless2a(benchmark::State& state) {
srand(1);
const unsigned int N = state.range(0);
std::vector<unsigned long> v1(N), v2(N);
std::vector<int> c1(N);
for (size_t i = 0; i < N; ++i) {
v1[i] = rand();
v2[i] = rand();
c1[i] = rand() & 0x1;
}
unsigned long* p1 = v1.data();
unsigned long* p2 = v2.data();
int* b1 = c1.data();
for (auto _ : state) {
unsigned long a1 = 0, a2 = 0;
unsigned long* a[2] = { &a2, &a1 };
for (size_t i = 0; i < N; ++i) {
unsigned long s[2] = { p1[i] * p2[i], p1[i] - p2[i] };
a[b1[i]] += s[b1[i]];
}
benchmark::DoNotOptimize(a1);
benchmark::DoNotOptimize(a2);
benchmark::ClobberMemory();
}
state.SetItemsProcessed(N*state.iterations());
//state.SetBytesProcessed(N*sizeof(unsigned long)*state.iterations());
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!