defeditDistDP(s1, s2): """编辑距离计算 params:文本1,string 文本2,string """ m = len(s1.strip()) n = len(s2.strip()) # 创建一张表格记录所有子问题的答案 dp = [[0for x in range(n+1)] for x in range(m+1)] # 从上往下填充DP表格 for i in range(m+1): for j in range(n+1): if i == 0or j == 0: dp[i][j] = max(i, j) # 如果两个字符串结尾字母相同,距离不变 elif s1[i-1] == s2[j-1]: dp[i][j] = dp[i-1][j-1] # 如果结尾字母不同,那我们就需要考虑三种情况,取最小的编辑距离 # 替换,添加,删除 else: dp[i][j] = 1 + min(dp[i-1][j-1], dp[i][j-1], dp[i-1][j])
classSimhash(object): def__init__(self, value, f=64, reg=r'[\w\u4e00-\u9fcc]+', hashfunc=None, log=None): """ `f` is the dimensions of fingerprints
`reg` is meaningful only when `value` is str and describes what is considered to be a letter inside parsed string. Regexp object can also be specified (some attempt to handle any letters is to specify reg=re.compile(r'\w', re.UNICODE))
`hashfunc` accepts a utf-8 encoded string and returns a unsigned integer in at least `f` bits. """
self.f = f self.reg = reg self.value = None
if hashfunc isNone: self.hashfunc = _hashfunc else: self.hashfunc = hashfunc
if log isNone: self.log = logging.getLogger("simhash") else: self.log = log
if isinstance(value, Simhash): self.value = value.value elif isinstance(value, str): # print("build by text") self.build_by_text(str(value)) elif isinstance(value, collections.Iterable): self.build_by_features(value) elif isinstance(value, numbers.Integral): self.value = value else: raise Exception('Bad parameter with type {}'.format(type(value)))
def__eq__(self, other): """ Compare two simhashes by their value.
:param Simhash other: The Simhash object to compare to """ return self.value == other.value
def_slide(self, content, width=4): return [ content[i:i + width] for i in range(max(len(content) - width + 1, 1)) ]
def_tokenize(self, content): content = content.lower() content = ''.join(re.findall(self.reg, content)) ans = self._slide(content) return ans
defbuild_by_text(self, content): features = self._tokenize(content) features = {k: sum(1for _ in g) for k, g in groupby(sorted(features))} return self.build_by_features(features)
defbuild_by_features(self, features): """ 核心方法 `features` might be a list of unweighted tokens (a weight of 1 will be assumed), a list of (token, weight) tuples or a token -> weight dict. """ v = [0] * self.f # 初始化 [0,0,0,...] masks = [1 << i for i in range(self.f)] # 二进制下[1000, 0100, 0010, ...]
if isinstance(features, dict): features = features.items()
for f in features: if isinstance(f, str): h = self.hashfunc(f.encode('utf-8')) # hash成32位 w = 1 else: assert isinstance(f, collections.Iterable) h = self.hashfunc(f[0].encode('utf-8')) w = f[1] for i in range(self.f): # mask位置为1,则vi加上w,否则减去w v[i] += w if h & masks[i] else -w
ans = 0 for i in range(self.f): if v[i] > 0: # 如果大于0,就把那一位变成1 ans |= masks[i] self.value = ans
defdistance(self, another): """计算两个vector有多少个位置不一样""" assert self.f == another.f x = (self.value ^ another.value) & ((1 << self.f) - 1) # (1 << self.f) - 1: self.f个位的1 ans = 0 while x: # bin(x)不全为0,即x非0 ans += 1 x &= x - 1# bin计算,每算一次,低位的第一个1变为0 return ans
classSimhashIndex(object): def__init__(self, objs, f=64, k=2, log=None): """ 使用Simhash进行相似字符串检索 `objs` is a list of (obj_id, simhash) obj_id is a string, simhash is an instance of Simhash `f` is the same with the one for Simhash `k` is the tolerance """ self.k = k self.f = f count = len(objs)
if log isNone: self.log = logging.getLogger("simhash") else: self.log = log
self.log.info('Initializing %s data.', count)
self.bucket = collections.defaultdict(set)
for i, q in enumerate(objs): if i % 10000 == 0or i == count - 1: self.log.info('%s/%s', i + 1, count) self.add(*q)
defget_near_dups(self, simhash): """ `simhash` is an instance of Simhash return a list of obj_id, which is in type of str """ assert simhash.f == self.f
ans = set()
for key in self.get_keys(simhash): dups = self.bucket[key] self.log.debug('key:%s', key) if len(dups) > 200: self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups))
for dup in dups: sim2, obj_id = dup.split(',', 1) sim2 = Simhash(int(sim2, 16), self.f)
d = simhash.distance(sim2) if d <= self.k: ans.add(obj_id) return list(ans)
defadd(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f
for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) self.bucket[key].add(v)
defdelete(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f
for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) if v in self.bucket[key]: self.bucket[key].remove(v)
@property defoffsets(self): """ You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf> """ return [self.f // (self.k + 1) * i for i in range(self.k + 1)]
defget_keys(self, simhash): for i, offset in enumerate(self.offsets): if i == (len(self.offsets) - 1): m = 2**(self.f - offset) - 1 else: m = 2**(self.offsets[i + 1] - offset) - 1 c = simhash.value >> offset & m yield'%x:%x' % (c, i)
data = { 1: u'How are you? I am fine. blar blar blar blar blar Thanks.', 2: u'How are you i am fine. blar blar blar blar blar Thanks.', 3: u'This is a simhash test', }
# 序号和hash值保存 objs = [(str(k), Simhash(v)) for k, v in data.items()]
# 建立SimhashIndex对象,`k` is the tolerance index = SimhashIndex(objs, k=10) print("相似文本Bucket数量:", index.bucket_size())
# 输入需要查找的文本的hash值,get_near_dups获取相似文本 s1 = Simhash(u'How are you i am fine. blar blar blar blar blar thanks') print("相似文本id:", index.get_near_dups(s1))
import gensim.models as g from gensim.corpora import WikiCorpus import logging from hanziconv import HanziConv
docvec_size=192 classTaggedWikiDocument(object): def__init__(self, wiki): self.wiki = wiki self.wiki.metadata = True def__iter__(self): import jieba # tags信息是word2vec没有的,辅助训练 for content, (page_id, title) in self.wiki.get_texts(): yield g.doc2vec.LabeledSentence(words=[w for c in content for w in jieba.cut(HanziConv.toSimplified(c))], tags=[title])
#load model m = g.Doc2Vec.load(model) test_docs = [x.strip().split() for x in codecs.open(test_docs, 'r', 'utf-8').readlines()]
#infer test vectors output = open(output_file, 'w') a=[]
for d in test_docs: output.write(' '.join([str(x) for x in m.infer_vector(d, alpha=start_alpha, steps=infer_epoch)]) + '\n' ) a.append(m.infer_vector(d, alpha=start_alpha, steps=infer_epoch)) output.flush() output.close() print(SimlarityCalu(a[0],a[1]))