Reed-Solomon码:从入门到放弃 Part 2

目录#

多项式运算#

之前一直在说明系数都是GF(2)的多项式运算,即多项式$x$的系数只有两个值:要么是0,要么是1。而$x$能取的值亦是如此,要么是0,要么也是1。

而在踏入RS码领域之前的最后一项准备工作,就是摸清系数为GF(2^8)的多项式的运算规则,这样就能够直接以byte形式处理数据,而不再需要关注bit层面的运算,that’s enough。这里依旧使用$x$作为多项式的占位符。在后面,我们统一使用十六进制表示多项式的系数,如:

01 $x^4+$ 0f $x^3+$ 36 $x^2+$ 78 $x+$ 40 $=$ 0000 0001 $x^4+$ 0000 1111 $x^3+$ 0011 0110 $x^2+$ 0111 1000 $x+$ 0100 0000

在Python,可以用list表示其系数:[ 0x01, 0x0f, 0x36, 0x78, 0x40 ]

首先是多项式与标量的运算:

1
2
3
4
5
6
7
from typing import List

def gf_poly_scale(p: List[int], x: int) -> List[int]:
r = [0] * len(p)
for i in range(len(p)):
r[i] = gf_mul_lut(p[i], x)
return r

接下来是多项式的加法运算:

1
2
3
4
5
6
7
def gf_poly_add(p: List[int], q: List[int]) -> List[int]:
r = [0] * max(len(p), len(q))
for i in range(len(p)):
r[i + len(r) - len(p)] = p[i]
for i in range(len(q)):
r[i + len(r) - len(q)] ^= q[i]
return r

然后是多项式的乘法运算:

1
2
3
4
5
6
def gf_poly_mul(p: List[int], q: List[int]) -> List[int]:
r = [0] * (len(p) + len(q) - 1)
for j in range(len(q)):
for i in range(len(p)):
r[i+j] ^= gf_mul_lut(p[i], q[j])
return r

最后,是给定$x$的值,计算多项式的数值的方法:

1
2
3
4
5
def gf_poly_eval(p: List[int], x: int) -> int:
y = p[0]
for i in range(1, len(p)):
y = gf_mul_lut(y, x) ^ p[i]
return y

本例中采用的是秦九韶算法(Horner’s method),将$x$的整数次幂的计算分解为加法和乘法两个过程,即:

01 $x^4+$ 0f $x^3+$ 36 $x^2+$ 78 $x+$ 40 $=$ (((01 $x+$ 0f)$x+$ 36)$x+$ 78)$x+$ 40

虽然这种方法避开了$x$的整数幂的计算,但是在代码实现中,使用该方法引入的数据依赖所带来的额外时间开销,远比计算$x$的整数幂要大得多(这个在Part 4中有机会再详解)。这是也是我重写解码模块的直接原因。

举个夸张的例子直观讲述:你有很多个很慢很慢的计算器,算个乘法要3秒,算个加法要1秒,而你则很神速,按计算器的操作可以瞬间完成。假设$x=1$。使用秦九韶算法,首先需要花3秒计算01乘$x$的值01,再花1秒加上0f,才能得到结果0e。在等待结果的时候,你是只能干等着什么事情都干不了,让时间白白浪费掉。而对于初始的方法,首先你可以用第一个计算器计算$x^4$,接着马上用第二个计算器计算$x^3$……直到结果出来,再与系数010f等相乘,最后相加,你会发现计算器的利用率上升了不少,得到结果的速度也快很多。当然,现代的CPU指令和编译器也与之相似,当没有数据依赖时,编译器可以随机打乱每一条汇编指令,让每个时刻CPU都在忙着(相当于后面那个例子中,你要手忙脚乱地处理每个结果一样),而不是光坐着等结果(即前面的例子)。学过机组的人都很容易理解,计算器就是ALU,而自己则是一个取指令译码以及读写数据的工具人。

在实际的使用当中,$x$的指数非常大,通常会有$x^{255}$(GF(2^8))、$x^{511}$(GF(2^9))这种常见情况,个人认为,引入这个方法是原文的唯一一大败笔。至于如何修改,就留作读者思考了。

RS码#

终于来到激动人心的时刻了。之所以花费大篇章描述有限域算数和多项式,是因为RS码将数据视为多项式系数,利用完整定义的有限域运算规则生成纠错码。与开篇的BCH码相似,RS码也是对数据添加一段额外的纠错码用于数据的检查与纠正。

RS码的编码过程#

RS码的编码过程与BCH码相似,将数据视为一个多项式,使用不可约的生成多项式与其相除,将得到的余数作为纠错码,添加到原始数据的末端。

在GF(2^8)的情况下,由原始数据与生成的纠错码所组成的数据的长度不能超过(2^8)-1=255字节。

RS码的生成多项式#

RS码使用与BCH码相似的生成多项式(不要与先前的本原元$\alpha$搞混)。生成多项式是$(x-\alpha^n)$项的乘积($n=0$开始)。举个例子,纠错码长度为4时的生成多项式为(省略了化简步骤):

因此得到的是$g_4(x)=$ 01 $x^4+$ 0f $x^3+$ 36 $x^2+$ 78 $x+$ 40

当然,这只是个理想情况,在实际中会出现超过$\alpha^8$的情况,需要与本原多项式进行取余,这时候得到的生成多项式就不会这么直观了。而这操作对应的代码实现为:

1
2
3
4
5
def rs_generator_poly(nsym: int) -> List[int]:
g = [1]
for i in range(nsym):
g = gf_poly_mul(g, [1, gf_pow(2, i)])
return g

多项式除法#

最简单的还是长除法,举个例子说明如何计算12 34 56与生成多项式相除,老惯例,先对数据后面补4 byte的0:

1
2
3
4
5
6
7
8
9
10
11
12
                             12 da df
______________________
01 0f 36 78 40 ) 12 34 56 00 00 00 00
^ 12 ee 2b 23 f4
--------------
da 7d 23 f4 00
^ da a2 85 79 84
--------------
df a6 8d 84 00
^ df 91 6b fc d9
--------------
37 e6 78 d9

使用长除法的时候,商永远取最高位非零系数的值(如第一次取的12,在异或之后取的da)。1201 0f 36 78 40可以直接使用gf_poly_mul计算得到12 ee 2b 23 f4,再与原来的值相减(对应GF减法中的异或)。

与BCH码相似,最后得到的余数将会作为其纠错码,添加到数据末端,从而得到编码后的数据为:12 34 56 37 e6 78 d9

对应的代码实现为:

1
2
3
4
5
6
7
8
9
10
def gf_poly_div(dividend: List[int], divisor: List[int]) -> Tuple[List[int], List[int]]:
msg_out = list(dividend)
for i in range(len(dividend) - len(divisor) + 1):
coef = msg_out[i]
if coef != 0:
for j in range(1, len(divisor)):
if divisor[j] != 0:
msg_out[i + j] ^= gf_mul_lut(divisor[j], coef)
sep = -(len(divisor)-1)
return msg_out[:sep], msg_out[sep:]

编码函数#

最后得到的编码函数(msg_in代表需要编码的数据,nsym则为额外的纠错码个数):

1
2
3
4
5
def rs_encode_msg(msg_in: List[int], nsym: int) -> List[int]:
gen = rs_generator_poly(nsym)
_, remainder = gf_poly_div(msg_in + [0] * nsym, gen)
msg_out = msg_in + remainder
return msg_out

最后运行的结果:

1
2
3
4
5
6
7
8
>>> msg_in = [ 0x40, 0xd2, 0x75, 0x47, 0x76, 0x17, 0x32, 0x06,
... 0x27, 0x26, 0x96, 0xc6, 0xc6, 0x96, 0x70, 0xec ]
>>> msg = rs_encode_msg(msg_in, 10)
>>> for i in range(len(msg)):
... print(hex(msg[i]), end=' ')
...
0x40 0xd2 0x75 0x47 0x76 0x17 0x32 0x6 0x27 0x26 0x96 0xc6 0xc6 0x96 0x70 0xec
0xbc 0x2a 0x90 0x13 0x6b 0xaf 0xef 0xfd 0x4b 0xe0

是不是很简单……

RS码的解码过程#

RS码的解码过程为:从一个可能出现错误的RS码(错误可能在原始数据中,也可能在添加的纠错码中)中,返回一个正确的原始数据,即从先前计算得到的RS码中修复该数据。

抛开之前的穷举法不谈,RS码的解码的算法还是有挺多的,下面仅为一个例子,使用Berlekamp-Massey对其进行解码,该解码过程可以分为如下几部分:

为了不产生翻译的歧义,这里就直接使用原名。

  1. 计算syndromes polynomial,从而得知RS码是否发生错误;
  2. 计算erasure/error locator polynomial,得知数据中哪里发生错误;
  3. 计算erasure/error evaluator polynomialerasure/error magnitude polynomial,得知每个错误被修改的程度;
  4. 通过对数据数据直接与magnitude polynomial相减,从而修复输入数据。

在解码过程中,数据的错误类型可以分为以下两类:erasure(即知道哪里出错,但不知道正确数据)和error(即不知道哪里出错,也不知道正确数据)。每纠正一处erasure,需要花费一个纠错码,而纠正一处error,则需要花费两个纠错码。通常情况下,我们仅关注后面一种情况(因为在实际使用中,知道错误发生在何处的应用场景非常局限)。因此,在纠错码长度为nsym的情况下,每段数据所能容纳的最大的错误数为nsym/2。

Syndrome计算#

将输入的消息作为多项式的系数进行计算,而$x$分别对应每个$\alpha^0,\ldots,\alpha^{\text{nsym}}$。在没有发生错误的情况下,所有的计算结果都应该为0,否则该值将会被修复工作的后续步骤所使用。

1
2
3
4
5
def rs_calc_syndromes(msg: List[int], nsym: int) -> List[int]:
synd = [0] * nsym
for i in range(nsym):
synd[i] = gf_poly_eval(msg, gf_pow(2, i))
return [0] + synd

利用上面编码得到的数据,对其直接计算syndrome,以及在修改数据后计算syndrome,得到的结果如下:

1
2
3
4
5
6
7
>>> synd = rs_calc_syndromes(msg, 10)
>>> print(synd)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # not corrupted message = all 0 syndromes
>>> msg[0] = 0 # deliberately damage the message
>>> synd = rs_calc_syndromes(msg, 10)
>>> print(synd)
[0, 64, 192, 93, 231, 52, 92, 228, 49, 83, 245] # when corrupted, the syndromes will be non zero

Erasure纠正#

在错误位置已知的情况下,纠正数据是一件不太难的事情。在Syndrome已知的情况下,我们可以直接计算locator polynomial:

1
2
3
4
5
def rs_find_errata_locator(e_pos: List[int]) -> List[int]:
e_loc = [1]
for i in e_pos:
e_loc = gf_poly_mul(e_loc, gf_poly_add([1], [gf_pow(2, i), 0]))
return e_loc

其中e_pos为已知的错误的位置,这里使用的是该位置对应的$x$的指数的值,即在数据的最后一个byte时,对应的是$x^0$,则e_pos为0,而往前一个byte对应$x^1$,e_pos为1,以此类推。

接着使用locator polynomial计算其evaluator polynomial:

1
2
3
def rs_find_error_evaluator(synd: List[int], err_loc: List[int], nsym: int) -> List[int]:
_, remainder = gf_poly_div(gf_poly_mul(synd, err_loc), [1] + [0] * (nsym + 1))
return remainder

最后,使用Forney算法计算正确的数据的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def rs_correct_errata(msg_in: List[int], synd: List[int], err_pos: List[int]) -> List[int]:
coef_pos = [len(msg_in) - 1 - p for p in err_pos]
err_loc = rs_find_errata_locator(coef_pos)
err_eval = rs_find_error_evaluator(synd[::-1], err_loc, len(err_loc) - 1)[::-1]
x = []
for i in range(len(coef_pos)):
l = 255 - coef_pos[i]
x.append(gf_pow(2, -l))
e = [0] * len(msg_in)
xlength = len(x)
for i, xi in enumerate(x):
xi_inv = gf_inv(xi)
err_loc_prime_tmp = []
for j in range(xlength):
if j != i:
err_loc_prime_tmp.append(gf_sub(1, gf_mul_lut(xi_inv, x[j])))
err_loc_prime = 1
for coef in err_loc_prime_tmp:
err_loc_prime = gf_mul_lut(err_loc_prime, coef)
y = gf_poly_eval(err_eval[::-1], xi_inv)
y = gf_mul_lut(gf_pow(xi, 1), y)
if err_loc_prime == 0:
raise Exception('Could not find error magnitude')
magnitude = gf_div(y, err_loc_prime)
e[err_pos[i]] = magnitude
msg_in = gf_poly_add(msg_in, e)
return msg_in

继续用上面的例子来说,将首个字节的数据修改为0之后,再使用rs_correct_errata修复,可以看到数据可以还原到修改前的值(其中[0]代表首个字节发生了错误):

1
2
3
4
5
>>> msg[0] = 0
>>> synd = rs_calc_syndromes(msg, 10)
>>> msg = rs_correct_errata(msg, synd, [0])
>>> print(hex(msg[0]))
0x40

Error纠正#

要处理错误位置未知的数据,首先进行的是对出错的地方进行定位,计算其error locator polynomial:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def rs_find_error_locator(synd: List[int], nsym: int) -> List[int]:
err_loc = [1]
old_loc = [1]
synd_shift = 0
if len(synd) > nsym:
synd_shift = len(synd) - nsym
for i in range(nsym):
k = i + synd_shift
delta = synd[k]
for j in range(1, len(err_loc)):
delta ^= gf_mul_lut(err_loc[-(j+1)], synd[k-j])
old_loc = old_loc + [0]
if delta != 0:
if len(old_loc) > len(err_loc):
new_loc = gf_poly_scale(old_loc, delta)
old_loc = gf_poly_scale(err_loc, gf_inv(delta))
err_loc = new_loc
err_loc = gf_poly_add(err_loc, gf_poly_scale(old_loc, delta))
while len(err_loc) and err_loc[0] == 0:
del err_loc[0]
errs = len(err_loc) - 1
if errs * 2 > nsym:
raise Exception('Too many errors to correct')
return err_loc

接着,利用error locator polynomial,便可以将错误的位置穷举出来。当然,有一种更有效的方法叫Chien search,如何实现就留给读者考虑了。

1
2
3
4
5
6
7
8
9
def rs_find_errors(err_loc: List[int], nmsg: int) -> List[int]:
errs = len(err_loc) - 1
err_pos = []
for i in range(nmsg):
if gf_poly_eval(err_loc, gf_pow(2, i)) == 0:
err_pos.append(nmsg - 1 - i)
if len(err_pos) != errs:
raise Exception('Too many (or few) errors found')
return err_pos

接下来计算的是Forney syndrome,一种仅计算error(忽略了erasure)的syndrome,能够根据错误的位置,对损坏的数据内容进行修复:

1
2
3
4
5
6
7
8
def rs_forney_syndrome(synd: List[int], nmsg: int) -> List[int]:
erase_pos_rev = [nmsg - 1 - p for p in pos]
fsynd = list(synd[1:])
for i in range(len(pos)):
x = gf_pow(2, erase_pos_rev[i])
for j in range(len(fsynd) - 1):
fsynd[j] = gf_mul(fsynd[j], x) ^ fsynd[j + 1]
return fsynd

继续用上面的例子讲述定位错误以及纠正的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> print(hex(msg[10]))
0x96
>>> msg[0] = 6
>>> msg[10] = 7
>>> msg[20] = 8
>>> synd = rs_calc_syndromes(msg, 10)
>>> err_loc = rs_find_error_locator(synd, 10)
>>> pos = rs_find_errors(err_loc[::-1], len(msg)) # find the errors locations
>>> print(pos)
[20, 10, 0]
>>> msg = rs_correct_errata(msg, synd, pos)
>>> print(hex(msg[10]))
0x96

最后,将各个部分组装在一块,得到如下的错误纠正函数,msg_in对应的是完整的数据(即包括原始数据+RS纠错码)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def rs_correct_msg(msg_in: List[int], nsym: int) -> List[int]:
if len(msg_in) > 255:
raise Exception('Message is too long (exceeded 255 bytes)')
msg_out = list(msg_in)
synd = rs_calc_syndromes(msg_out, nsym)
if max(synd) == 0:
return msg_out[:-nsym]
fsynd = synd[1:]
err_loc = rs_find_error_locator(fsynd, nsym)
err_pos = rs_find_errors(err_loc[::-1], len(msg_out))
if err_pos is None:
raise Exception('Could not locate error')
msg_out = rs_correct_errata(msg_out, synd, err_pos)
return msg_out[:-nsym]

应用例子#

最后用一个例子讲述RS码编码和解码的过程。在RS码中,有几个重要的参数,一个是$n$,指的是编码后的数据长度,其数值不超过$2^m-1$($m$为GF(2^m)的指数,这里使用的是$m=8$),另外一个是$k$,纠错码的长度,即代码中的nsym。因此,有效的数据长度为$n-k$。除了RS的参数外,还有一个影响GF的参数,那就是本原多项式(primitive polynomial),它决定了由它构成的GF的四则运算的数值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
prim = 0x11d
n = 20
message = 'hello world'
k = len(message)
if k > n:
raise Exception('Message too long')

init_table(prim)

# Message encode
ecc_msg = rs_encode_msg([ord(x) for x in message], n - k)
print('Original: %s' % str(ecc_msg))

# Erase some data
ecc_msg[0] = 0
ecc_msg[1] = 2
ecc_msg[4] = 2
print('Corrupted: %s' % str(ecc_msg))

corrected_msg = rs_correct_msg(ecc_msg, n - k)
print('Repaired: %s' % str(corrected_msg))
print(''.join([chr(x) for x in corrected_msg]))

最后得到的输出应该如下:

1
2
3
4
Original:  [104, 101, 108, 108, 111,  32, 119, 111, 114, 108, 100, 145, 124, 96, 105, 94, 31, 179, 149, 163]
Corrupted: [ 0, 2, 108, 108, 2, 32, 119, 111, 114, 108, 100, 145, 124, 96, 105, 94, 31, 179, 149, 163]
Repaired: [104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]
hello world

总结#

这次简单介绍了在GF(2^8)情况下,RS码的编码和解码过程的代码实现,对其中详细的数学原理感兴趣的读者可以自行阅读相关算法的文章。同上篇一致,提供的python代码仅供娱乐,切勿直接使用。在目前这种情况下,每次编码后的数据长度不多于255。长度超于该值的都需要分段,每段分别进行独立编码,再进行拼接。因此,在数据存储上来说,在一大块连续数据失效的情况下,依旧没有太大作用。当然了,对数据视为多维的矩阵,进行transpose,或者用其他shuffle的样式打乱原数据的顺序,可以简单粗暴地防止出现数据连续失效的情况,尽可能把失效的数据分摊到不同的校验块之中。

后续将会讲述GF(2^p)的编码和解码过程,对数据进行紧凑编码时,进行额外的padding,使输入和输入数据能够充分利用GF的范围。