优化: 按不同维度切分
########## hadoop ##########
删除job
hadoop job -kill job_xxx
折半查找(有序查找)
数据统计(hive)
数据过滤
同类汇聚
全局排序
容错框架
常见应用
从日志中找到某一个条件(时间,用户)数据
除去非法数据,保留合法数据
数据格式整理
混合日志,按时间排序
按某个或多个字段排序
########## mapreduce ##########
分布式计算
hadoop fs -cat /xxx
hadoop fs -text /xxx
hadoop job -kill job_xxx_xxx
########## hdfs ##########
分布式分拣存储系统
nameNode secondaryNameNode dataNode
secondaryNameNode
用来保存HDFS的元数据信息,比如命名空间信息、块信息等,由于这些信息是在内存的,为了考虑持久化到磁盘
能做什么
– 存储并管理PB级数据
– 处理非结构化数据
– 注重数据处理的吞吐量(延迟不敏感)
– 应用模式:write-once-read-many存取模式(无数据一致性问题)
不适合做什么
– 存储小文件(不建议)
– 大量随机读(不建议)
– 需要对文件修改(不支持)
– 多用户写入(不支持)
########## storm ##########
实时数据分析需求
– 实时报表动态展现
– 数据流量波动状态
– 反馈系统
多机流式系统
– 流量控制
– 容灾冗余
– 路径选择
– 扩展
Storm任务没有结束,Hadoop任务执行完结束
• Storm延时更低,得益于网络直传、内存计算,省去了批处理的收集数据的时间
• Hadoop使用磁盘作为中间交换的介质,而storm的数据是一直在内存中流转的
• Storm的吞吐能力不及Hadoop,所以不适合批处理计算模型
• 没有持久化层
• 保证消息得到处理
• 支持多种编程语言
• 高效,用ZeroMQ作为底层消息队列
• 支持本地模式,可模拟集群所有功能
• 使用原语
以Tuple为基本单位组成的一条有向无界的数据流
Integer,long,short,byte,string,double,float,boolean和byte array,包括自定义类型
Topology
由spouts和bolts组成的图,通过stream grouping将图中的spouts和bolts连接起来
Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言
创建的topology
worker -> executor -> task(spout -> bolt)
流式计算
持续计算
分布式rpc
架构
nimbus -> supervisor -> worker
topology
spout/bolt
默认情况下task的数目等于executor线程数目, 即1个executor线程只运行1个task
########## zookeeper ##########
一个松散耦合的分布式系统中粗粒度锁以及可靠性存储的系统
Persistent Nodes
永久有效地节点,除非client显式的删除,否则一直存在
Ephemeral Nodes
临时节点,仅在创建该节点client保持连接期间有效,一旦连接丢失,zookeeper会自动删除该节点
Sequence Nodes
顺序节点,client申请创建该节点时,zk会自动在节点路径末尾添加递增序号,这种类型是实现分布式锁,分布式queue等特殊功能的关键
数据访问可以设置权限
机制:用户,权限
########## hbase ##########
优势
海量数据存储
快速随机访问
大量写操作的应用
应用场景
互联网搜索引擎数据存储
海量数据写入
消息中心
内容服务系统
大表复杂,多维度索引
大批量数据获取
三维有序
一个hregion是不会拆分到多个server上的
行锁
物理模型
region->hregion->hstroe->storefile->hfile
scan + filter
特殊的表
meta(存储用户表的元数据) 不做分裂
hstore -> column family
读取流程
rowkey -> memstore -> hfile
compaction和split
hfile 的数量过多的时候会限制写请求的速度
miner: 目的是为了保证服务不中断,但是合并不彻底
major: 目的是合并的更彻底,但是服务存在中断的风险
split
当一个region太大时,将其分裂成两个region,可以手动或者自动做
########## hive ##########
mapreduce 的封装
Hive基于一个统一的查询分析层,通过SQL语句的方式对HDFS上的数据进行查询、统计和分析
Hive本身不存储数据,它完全依赖HDFS和MapReduce
Hive的内容是读多写少,不支持对数据的改写和删除
三个属性: 空格 \t \001
列分隔符: \n
行分隔符:
读取文件数据的方法:textFile sequenceFile(二进制,kv形式序列化到文件中) RCFile(hive推出面向列的数据格式)
适合离线任务
可扩展性
UDF: 直接应用于select语句,通常查询的时候,需要对字段做一些格式化处理(比如大小写转换),特点:一对一
UDAF: 多对一场景
UDTF: 一对多场景
读时模式:只有hive读的时候才会检查,解析字段和schema,所以加载数据非常迅速
写时模式:写得慢,有事务和检查索引等,但是读会得到优化
海量数据挖掘, 实时性差
用户接口 -> 语句转换 -> 数据存储
hive默认表存放路径一般都是在你工作目录的hive目录里面,按表名做文件夹分开,如果你
有分区表的话,分区值是子文件夹,可以直接在其它的M/R job里直接应用这部分数据
table
external table
删除元数据,不删除数据 (推荐创建外部表)
partition
时间分区
bucket
按桶分: 针对某一列进行桶的组织,通常队列值做哈希
优化查询,缩小数据量关联
reduce个数计算公式 = inputFileSize / bytes per reducer
group by
分区剪裁(join/group by)
笛卡尔积(join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积)
map join(/*+ MAPJOIN(tablelist) */,必须是小表,不要超过1G,或者50万条记录)
union all(先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个Select,最终只有一个mr)
单用户/多用户/远程模式
########## flume ##########
分布式,可信任的弹性系统,用于高效收集,汇聚和移动大规模日志信息从多种不同的数据源到一个集中的数据存储中心
接入资源数据类型及接出数据类型
多路径流量,多管道接入流量,多管道接出流量,上下文路由
水平扩展
agent(source(输入) -> channel(缓存) -> sink(输出))
agent interceptor
log server ->(interceptor->source->selector) -> agent -> collector -> storage
event(header + byte payload)
Replicating Channel Selector (default): 将source过来的events发往所有channel
Multiplexing Channel Selector: 而Multiplexing 可以选择该发往哪些channel
########## kafka ##########
分布式,基于发布订阅的消息系统/分布式缓存
broker
producer
consumer
topic
partition: topic基础上做了进一步区分分层
producer和broker之间不存在负载均衡
consumer和broker之间存在负载均衡(zookeeper)
producer和broker之间: push模式
consumer和broker之间: pull模式
partion 内部有序
客户端持久化offset,消费者通过offset去读取消息
consumer 不允许同时读取多个partion的数据
partition 对应多个segment
三种保证策略
– At most once 消息可能会丢,但绝不会重复传输
– At least one 消息绝不会丢,但可能会重复传输
– Exactly once 每条消息肯定会被传输一次且仅传输一次
动态维护了一组(ISR)副本,表示已追上了leader,才可以被选为leader
########## yarn ##########
集群资源管理
resourceManater
作业调度及监控
applicationMaster代替jobtracker
可插拔的调度组件scheduler,负责运行中的各种应用分配资源,不负责应用程序的监控和状态跟踪
NodeManager代替taskTraker
########## hdfs2.0 ##########
namenode ha
namenode federation
将一部分文件迁移到其他nn进行管理,只有元数据管理和存放被分隔开,但是真实的数据存储还是共享
优势:
命名空间可以横向扩展
性能提升
资源隔离
应用:新的文件系统: viewfs(视图文件系统)
hdfs快照
主要用来做数据备份
基于时间点文件系统拷贝
修改时间倒序
快照数据是当前数据减去修改的部分计算出来的
只记录了block列表和文件大小
hdfs缓存
hdfs acl
########## spark ##########
一个分布式的并行计算框架
application -> job(action) -> stage -> task
spark context (变量/集群的配置信息)
shuffle: 数据混洗-核心机制:(相同的key分发到一个区域) 数据分区,排序,局部聚合,缓存,拉取,再合并排序
>============================================================================================>
正式课:
01
检索系统:
第一步: 召回阶段: 用token检索item
第二步: 过滤阶段: 把劣质的item过滤掉
第三步: 排序,把好的item拍前面
第四步: 截断,取top n
02
用户操作行为埋点 -> 日志收集 -> 结构化数据 -> 数据挖掘平台
mapreduce
先排序后溢写
hadoop streaming
单机调试
cat input | mapper | sort | reduce > output
controller v + controller a = ^A
controller v + controller i = ^I
03
nlp
tfidf/关键词提取/lcs
相似度
文本相似度
语义相似
字面相似
方案
语义相似: 依靠用户行为,最基本的方法:(1) 基于共点击的行为(协同过滤) (2) 回归算法
字面相似: (1) lcs最大公共子序列 (2)利用中文分词
解决:
余弦相似度: a*b / ||a||x||b||
a*b = n∑i=1(xi * yi)
|a| = sqrt(n∑i(xi)^2)
|b| = sqrt(n∑i(yi)^2)
BOW
bag of word
计算词频 -> 出现次数作为向量
套公式计算出相似度
TFIDF
TF(词频)
关键词: 在当前文章出现较多,在其他文章出现较少
TF = 某个词在文章中出现的次数/文章的总词数
TF = 某个词在文章中出现的次数/该文出现次数最多的词的出现次数
IDF(反文档频率)
log(语料库的文档总数/包含该词的文档数+1)
TF-IDF = 词频 * 反文档频率
自动摘要
(1) 确定关键词集合 1, top-10 2,阈值截断 > 0.8
(2) 哪些句子包含关键词,把句子取出来
(3) 对关键词排序,对句子做等级划分
(4) 把等级高的句子取出来就是摘要
TFIDF实践
数据预处理: 把所有文章的内容全部收集到一个文件中
LCS
从字面角度衡量字面相似度的方法之一
动态规划
lcs(xm,yn) = { ① lcs(xm-1, yn-1) + xm ; 当xm=yn ② max{lcs(xm-1, yn), lcs(xm,yn-1)} ;当xm≠yn
04
中文分词
推荐场景: 粗粒度
搜索场景: 细粒度 - 召回
切分方案
按位置索引切分
最基本的切词方法
最大长度匹配: 前向,后向
trie树常用于加速分词查找词典问题
dag
有向无环图
贝叶斯公式: p(s|c) = p(c|s) * p(s) / p(c) => p(s|c) = p(s)
加log防止向下溢出
一元模型 Unigram (前后两词出现概率是相互独立的)
二元模型 Bigram (一个词的出现仅依赖于它前面出现的一个词)
三元模型 Trigram (一个词的出现仅依赖于它前面出现的两个词)
jieba分词 (分词/词性标注)
去杂质 -> 词库分词(trie树) -> hmm分词(合并词(viterbi算法))
一般来说,分词个数小概率较大
jieba分词下载: https://github.com/fxsjy/jieba
实践
pyweb + jieba
05
HMM: 隐马尔可夫模型
马尔可夫模型(一个序列数据)
1,生成方式
2,路径选择(viterbi) -> 动态规划
初始概率
出现/总数
转移概率
出现某个事件的概率基础上出现的概率
ab ab ac
p(b|a) = 2/3
p(c|a) = 1/3
隐马尔可夫模型(两个序列数据)
初始概率 BEMS位置信息
一个词 -> S
开头 -> B
结尾 -> E
中间 -> M
前向概率
At(K) = sum(At-1(L) * P(s=k|o=l) * P(o=Ot|K))
06
推荐算法
基于内容推荐(CB)
用户画像
token -> item,item,item
userid -> token,token,token
相关性计算
A*B/|A||B|
A(123)
B(234)
N(A)∩N(B) / N(A)∪N(B) = 2/4
排序
基于行为推荐(CF)(数据量大)
收集大量的用户行为数据
矩阵相乘
冷启动
用户冷启动-数据或者标签
物品冷启动-相似的物品
系统冷启动-引入专家知识
先内容后行为
协同过滤实践
倒排式
通过mapreduce去分布式计算相似度
userid, item, score
缺点: 内存较大
分块式
UI -> M
IU -> MT
UU = UI * IU
07
分类算法
NB算法 -> 分类问题 -> 效果评估 -> LR逻辑回归 -> DNN
泛化能力
LR 二分类
Softmax 多分类
文章分类
特征: 分词, 关键词
机器学习
实现问题: 复杂模型通常和开销有一定正比例关系
过拟合: 对于训练集学习的太过深刻,缺少泛化能力
NB 朴素贝叶斯公式
P(X|Y) = P(X,Y)/P(Y)
P(X,Y) = P(X|Y)P(Y)
P(X,Y) = P(Y|X)P(X)
P(X|Y) = P(Y|X)P(X)/P(Y)
p(yi|X) = P(yi)p(X|yi)/P(X) -> p(yi|X) = P(yi)p(X|yi)
p(X|y=军事) = p(x=军舰|y=军事)*p(x=大炮|y=军事)*p(x=航母|y=军事)
一个好的模型:既要求准确,又要求召回
优点
简单高效
结果是概率,对二值和多值同样适用
缺点
独立性假设有时不合理
评估方法
混淆矩阵: PR, ROC, AUC
AUC
负样本排在正样本前面的概率
cat auc.raw | sort -t$'\t' -k2g |awk -F'\t' '($1==-1){++x;a+=y;}($1==1){++y;}END{print 1.0-a/(x*y);}'
x*y:正负样本pair对
a:错误的pair对
a/x*y:错误的概率
1-a/x*y:正确的概率
08
逻辑回归
线性回归
最小误差平方和
w1x1 + w2x2 + ... + wnxn + b = wx
w:权重向量
x:样本[x1,x2....,xn]
负对数似然
梯度下降法
批量梯度下降BGD -> 慢,效果好,实用性差
随机梯度下降SGD -> 效果差,速度快,实用性相对好
小批量梯度下降 mini-batch-BGD
09
多分类
逻辑回归是多分类的一种特殊形式
过拟合
原因: 过度训练
避免: 交叉验证/人工筛选/增加惩罚(l1/l2)/增加训练数据
欠拟合
原因: 模型没有很好捕捉到数据特征
解决方法: 决策树扩展分支/增加训练轮数
10
推荐系统实践
1,数据预处理
用户画像数据: user_profile.data
userid,性别,年龄,收入,地域
音乐物品元数据
itemid,name,desc,时长,地域,标签
用户行为数据
userid,itemid,用户对该物品的收听时长
整合出相关联数据到一份文件
gen_base.py
2,CB召回算法
以tokenid,itemid,score形式整理训练数据
利用jieba分词对item name进行中文分词
用协同过滤算法跑出item-item数据
得到基于cb的ii矩阵
对数据格式化,item->itemlist形式,整理出kv形式
写入到缓存数据库
3,CF召回算法
以tokenid,itemid,score形式整理训练数据
4,排序sklearn
5,系统和数据库进行整合
安装unix2dos(格式转换)
unix2dos cf_reclist.redis
写入redis
cat cb_reclist.redis | /usr/local/src/redis-2.8.3/src/redis-cli --pipe
推荐系统demo
解析请求
加载模型
检索候选集合
获取用户特征
获取物品特征
打分排序
top-n过滤
数据封装返回
推荐系统总结
文本相似度
1,
余弦相似度-计算个体间的相似程度
TF = 某个词在文章中出现的次数/文章的总词数
TF = 某个词在文章中出现的次数/该文出现次数最多的词的出现次数
IDF-反文档频率 = log(语料库的文档总数/包含该词的文档数+1)
找出关键字 -> 词频集合 -> 生成词频向量 -> 计算两个向量的余弦相似度
2,
自动摘要 -> 优点: 简单快速,结果比较符合实际情况; 缺点: 以词频作为衡量标准,不一定正确
3,
lcs
最长公共子序列 (不要求连接)
最长公共子串 (要求连接)
4,
暴力穷举法 -> 复杂度高不可用
5,
动态规划法 (使用二维数组)
中文分词
切分方案: 位置/标记序列
词典匹配方法: 前向查找/后向查找
trie树常用于加速分词查找词典问题
切分词图 dag
概率语言模型
P(S|C)的概率最大 (词组最少)
贝叶斯公式 P(S|C) = P(C|S) * P(S) / P(C) 因为 P(C|S) = 1, 所以 P(S1|C) / P(S2|C) = P(S1) / P(S2)
假设每个词之间的概率是上下文无关的(实际中是不成立的), 则 P(S) = P(w1,w2,...,wm) ≈ P(w1)×P(w2)×...×P(wm)∝logP(w1)+logP(w2)+...+logP(wm)
一元模型 -> 贝叶斯公式
N元模型 (使用n个单词组成的序列来衡量切分方案的合理性)
P(S) = P(w2|w1) = P(w2|w1) / P(w1)
如果简化成一个词的出现仅依赖于它前面出现的一个词,那么就称为二元模型(Bigram)
P(S) ≈ P(w1) P(w2|w1)P(w3|w2)…P(wn|wn-1)
如果简化成一个词的出现仅依赖于它前面出现的两个词,就称之为三元模型(Trigram)
如果一个词的出现不依赖于它前面出现的词,叫做一元模型(Unigram)
jieba分词
https://github.com/fxsjy/jieba
精确模式:将句子最精确的分开,适合文本分析
全模式:句子中所有可以成词的词语都扫描出来,速度快,不能解决歧义
搜索引擎模式:在精确模式基础上, 对长词再次切分, 提高召回
基于trie树
采用动态规划查找最大概率路径,找出基于词频的最大切分组合
对于未登录词,采用了基于汉字成词的HMM模型,使用了viterbi算法
jieba分词细节
加载登录词 -> 建立trie树分词模型 -> 建立分词dag词图 -> 计算全局概率 -> (登录词)按词性标注
加载登录词 -> 建立trie树分词模型 -> 建立分词dag词图 -> 计算全局概率 -> (未登录词)token识别 -> 加载隐马尔概率图模型
马尔科夫模型 (对一个序列数据建模)
每个状态只依赖之前有限个状态
初始概率
状态转移概率
马尔科夫模型参数估计
最大似然法
状态转移概率 P(St+1=l|St=k)
初始概率 P(S1=k)
隐马尔可夫模型(对两个序列数据建模)
观察和隐藏序列共同构成(s1 -> s2 && s1 -> o1)
O -> S
初始概率 πk = P(S)
BEMS 开头 中间结尾独立成词
BEMS + 词性 + 概率的log值
状态转移概率 P(St+1=l|St=k)
发射概率 隐藏序列 -> 观察序列
前向概率
动态规划
后向概率
动态规划
推荐算法
方法
基于内容推荐
优点: 提升推荐结果的相关性/结果可解释/推荐结果容易被用户感知
缺点: 无个性化/依赖item的深度分析
基于协同推荐
优点: 充分利用群体智慧/推荐精度高/利于挖掘隐含的相关性
缺点: 推荐结果解释性较差/时效性不强/冷启动问题
user-base: 用户喜欢那些跟他有相似爱好的用户喜欢的东西
item-base: 用户喜欢跟他过去喜欢的物品相似的物品
倒排式
相似度计算公式
uid,itemid,score
分块式
分类算法
二值分类
多值分类
水平关系
层级关系
朴素贝叶斯分类器
最大似然估计
概率模型 -> 预测 -> 评测
评测指标
准确率 准确数 / 全部
精确率 单类别准确 / 单类别全部
召回率 单类别准确 / 多类别全部
优点: 简单有效/结果是概率
缺点: 独立性假设有时不合理
评估方法: 混淆矩阵(ROC/AUC)
一般情况下准确率高、召回率就低,召回率高、准确率低
AUC
负样本排在正样本前面的概率
回归算法
线性回归
一元线性回归
最小化误差平方和
多元线性回归
f(x,w) = wx // 矩阵相乘
最小二乘法
最优点(令导数=0)
w* = (X^T*X)^-1 * X^TY
逻辑回归
p(y=1|x) = 1/1+exp(-wx)
负对数似然
梯度下降法进行优化
批量梯度下降BGD
随机梯度下降SGD
小批量梯度下降MBGD
LR模型
过拟合
一个模型在训练集上表现很好,但实际预测中效果却大打折扣
原因: 过度训练或特征数较少
避免: 交叉验证/人工筛选/机器筛选/增加惩罚
欠拟合
原因: 模型没有很好的捕捉到数据特征或学习能力低下
避免: 决策树扩展分支/增加训练轮数
softmax回归(逻辑回归的推广)
激活函数
损失函数
正则化
L1范数: Lasso回归
可以产生稀疏权值矩阵,可以用于特征选择
L2范数: Ridge回归
可以防止模型过拟合
实践总结
加载用户特性数据(用户/物品/分数)写入字典 -> 加载物品特性数据()写入字典 -> 加载模型 -> 从缓存获取物品信息 -> 获取用户特征 -> 获取物品特征 -> 排序 -> 过滤 -> 返回
11
HDFS
不适合随机读
HDFS2.0
高可用,对两个nn同时发生心态
ZKFC负责对自己管辖内的NN进行健康检查
JournalNodes守护进程,完成元数据activeNN和standbyNN的一致性
QJM: 最低法定人数管理机制
namenode federation
减轻单一NN压力,将一部分文件转移到其他NN上管理
快照
允许用户指定要缓存的HDFS路径
明确的锁定可以阻止频繁使用的数据被从内存中清除
集中化缓存管理对于重复访问的文件很有用
可以换成目录或文件,但目录是非递归的
缓存
允许用户指定要缓存的HDFS路径
明确的锁定可以阻止频繁使用的数据被从内存中清除
集中化缓存管理对于重复访问的文件很有用
可以换成目录或文件,但目录是非递归的
acl
hadoop fs -getfacl /input/acl
hdfs dfs -setfacl -m user:mapred:r-- /input/acl
hdfs dfs -setfacl -x user:mapred /input/acl
yarn
分布式操作系统
作用: 资源整合,为了让系统资源利用最大化,在同一套硬件集群上同时可以运行MR/spark/storm
container:是个进程,NM来启动、并且监控Container,通过心跳上报给RM
spark
批量计算框架
速度快,在内存中流转
多线程模型
适合低延时,内存密集型任务
单机模式
集群模式独立集群,不利于充分利用
yarn模式
sparkcontext
实践
spark 完成 cf
milib LR NB
中文分词 webpy + mr, 批量中文分词
弹性分布式数据集(RDD)
容错
Executor的内存分为3块
优化
避免创建重复的RDD
尽可能复用同一个RDD
对多次使用的RDD进行持久化处理(.cache())
MEMORY_AND_DISK_SER
避免使用shuffle类算子
reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本地相同的key进行预聚合,而groupByKey算子不会预聚合
技术栈
spark core
spark sql
spark streaming
mlib
graphx
spark-1.6.0
scala-2.10.5
union 算子不去重
12
hive
sql解析引擎
元数据存储在mysql上
不支持数据的改写和删除
可扩展(用户自定义函数)
UDF: 用户自定义普通函数
UDAF: 用户自定义聚合函数
UDTF: 用户自定义表生成函数
driver: 将sql转换成mr
优化
partition(分区)(按时间分区(查询时按时间查询))
bucket(分桶) reduce,采样,控制reduce的数量
map优化
优化并发个数
hive.map.aggr=true ,相当于开启Combiner功能
reduce优化
优化并发个数
set mapred.reduce.tasks=10
cluster by:相当于distribute by和sort by的结合,但是只默认升序排序
加快查询速度
分区
MAPJOIN(x): x指定为小表,内存处理,通常不超过1G或者50w记录
union all: 先把两张表union all, 然后再做join或者group by,可以减少mr数量
multi-insert & multi-group by
automatic merge
multi-count distinct 一个MR,拆成多个的目的是为了降低数据倾斜的压力
STREAMTABLE(x) x视为大表,不放到内存
hive默认把左表数据放到缓存中,右边的表的数据做流数据
数据倾斜
大小表关联
大大表关联
数据清洗
拼接随机数
空间换时间
13
实践
hive-1.2.2 安装配置
现在hdfs创建
hive -f create_rating_table.sql
建表时指定location,在hdfs上的位置
批量灌数据(.cvs)
导出到本地
INSERT OVERWRITE local directory '/root/xxx/xxx/xxx' select x,y from xxx;
导出到HDFS
INSERT OVERRITE directory '/xxx' select x,y from xxx;
分区表partion
partitioned by(dt STRING)
LOAD DATA LOCAL INPUT '/root/xxx/2008-08.data' OVERWRITE INTO TABLE xxx partition(dt='2008-08')
show partitions xxx;
bucket 桶
set hive.enforce.bucketing = true;
clustered by (userid) INTO 16 buckets;
详细查看 desc formatted xxx;
用户自定义函数
UDF
ADD JAR /xxx/yyy/zzz.jar
创建临时函数
create temporary function upper_func as 'Uppercase';
select upper_func(xxx) from yyy limit 10;
UDAF
UDTF
ADD FILE /xxx/yyy/zzz.awk
transform
select transform(xxx) using 'python transform.py' as (uuu) from ttt limit 10;
14
hbase
适合存储海量的稀疏数据
结构
rowkey / column family / column qualifier / version
物理模型
region (按数据量(10G)增加副本) / Memstore 128m flush ->/ storeFile(对应着column family)
行锁定
一个机器多个region
region数目
太多: 会增加zookeeper负担,造成读写性能下降
太少: 降低读写并发的性能,导致压力不够分散
对于region过大要做切分,切成更小粒度的region分散到其他region上去,来缓解压力,负载均衡
不允许系统自动切分,空闲时刻再做手动切分
合并:手动完成
架构
HMaster: 负载均衡,管理region,管理table元数据,权限控制
HRegionServer: 管理本地HRegion,读写HDFS,维护Table数据
本地化原则
zookeeper
通过zk获取HRegion的地址(客户端会缓存)
/hbase/root-region-server, Root region的位置
/hbase/table/-ROOT-, 根元数据信息
/hbase/table/.META, 元数据信息
/hbase/master, 当选的Mater
/hbase/backup-masters, 备选的Master
/hbase/rs,Region Server的信息
/hbase/unassigned, 未分配的Region
memstore: 写缓存,每一个column family 都有自己的memstore
blockCache: 读缓存,为了提供读取效率
Hlog, 避免数据丢失 hlog -> memstore -> storefile
一个RegionServer上的所有region共享一个hlog
hbase表的设计
rowkey, 在region里按照字母排序
column family 设计
尽量少,建议1-2个
hbase将cf的列放到一起,不同cf的数据分开存
flush和region合并的时候,触发的基本单位都是region
如果memstore里面通常存储少量的数据时候,没有必要flush
compaction
此线上业务都会将关闭自动触发Major Compaction功能,改为手动在业务低峰期触发
实践
安装zookeeper
1)按安装跟目录下创建myid文件,里面分别填写数字(保证不一样)
2)zoo.cfg
数据目录
dataDir=/xxx/yyy/zookeeper-3.4.5/data
server.0=master:8880:7770
server.1=slave1:8881:7771
server.2=slave2:8882:7772
3)分发到从节点,分别在所有节点执行:./bin/zkServer.sh start
查看进程: jps -> QuorumPeerMain
安装hbase-0.98.6-hadoop2
环境变量配置
1)hbase-env.sh
export JAVA_HOME=/usr/local/src/jdk1.8.0_172
export HBASE_MANAGES_ZK=false
2)hbase-site.xml
3)regionservers
slave1
slave2
4)把修改好的目录,分发到其他从节点上(scp)
5)启动hbase:./bin/start-hbase.sh
jps -> HMaster / HRegionServer
http://master:60010/master-status
6)进入终端:./bin/hbase shell
检查: status
create 'm_table', 'meta_data' , 'action'
list
desc xxx;
增加列簇:
alter 'm_table', {NAME=>'cf_new', VERSIONS=>3, IN_MEMORY=>true}
删除列簇:
alter 'm_table', {NAME=>'action', METHOD=>'delete'}
删除表:
disable "m_table"
drop "m_table"
写数据:表名,roykey,列名,值
put "m_table", '1001', 'meta_data:name', 'zhang3'
put "m_table", '1001', 'meta_data:age', '18'
put "m_table", '1002', 'meta_data:name', 'li4'
put "m_table", '1002', 'meta_data:gender', 'man'
从内存写到硬盘(hdfs)
flush "m_table"
读数据:
全表扫描
scan "m_table"
读一条记录:
get "m_table", '1001'
修改版本号:
alter "m_table", {NAME=>'meta_data', VERSIONS=>3}
修改数据
put "m_table", '1001', 'meta_data:name', 'wang5'
查询指定时间戳数据
> get "m_table", '1001', {COLUMN=>"meta_data:name", TIMESTAMP=>1548591893705}
> get "m_table", '1001', {COLUMN=>"meta_data:name", TIMESTAMP=>1548592394970}
查询指定版本数据
get "m_table", '1001', {COLUMN=>"meta_data:name", VERSIONS=>1}
get "m_table", '1001', {COLUMN=>"meta_data:name", VERSIONS=>2}
知道value反查记录:
> scan "m_table", FILTER=>"ValueFilter(=, 'binary:wang5')"
> scan "m_table", FILTER=>"ValueFilter(=, 'binary:zhang3')"
根据value漫匹配查询
> scan "m_table", FILTER=>"ValueFilter(=, 'substring:ang')"
两个条件同时限制,对列名前缀做校验
> scan "m_table", FILTER=>"ColumnPrefixFilter('na') AND ValueFilter(=, 'substring:ang')"
put "m_table", '3001', 'meta_data:name', '777'
过滤开头
scan "m_table", FILTER=>"PrefixFilter('10')"
scan "m_table", FILTER=>"PrefixFilter('2')"
过滤行(ROW)
scan "m_table", {STARTROW=>'1002'}
scan "m_table", {STARTROW=>'1005'}
从1002开始,满足条件的,检索出来:
scan "m_table", {STARTROW=>'1002', FILTER=>"PrefixFilter('20')"}
scan "m_table", {STARTROW=>'1002', FILTER=>"PrefixFilter('20')"}
正则查找
put "m_table", 'user|4001', 'meta_data:name', '888'
正则过滤:
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SubstringComparator
import org.apache.hadoop.hbase.filter.RowFilter
> scan "m_table", {FILTER=>RowFilter.new(CompareFilter::CompareOp.valueOf('EQUAL'), RegexStringComparator.new('^user\|\d+$'))}
查看表的行数:
> count "m_table"
清空词表:
> truncate "m_table"
15
flume
agent 一个完整的数据收集工具(守护进程)
头部: kv
body: 包含实际内容
source -> channel -> sink
channel:存储池,只有sink组件处理后的事件,才会在channel中删除该事件
1)filechannel:存在磁盘——保证数据不丢失,慢
2)memorychannel:存在内存——快,存在丢数据风险
interceptor
header 里面
1)Timestamp:添加事件
2)host:添加hostname或者ip
3)static:自定义key,value
4)regex filter:通过正则来清洗event
5)regex extractor:通过正则在header中添加指定key,value(来自对数据的分析)
selector(选择器)
根据指定header值,将event发送到不同的channel
flume支持一个源发送事件到多个通道(channel)中——》事件流的复用
支持两种模式:
(1)replicating:复制(默认)
(2)multiplexing:复用
运行
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_exec.conf --name a1 -Dflume.root.logger=INFO,console
实践
输出到终端
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_netcat.conf --name a1 -Dflume.root.logger=INFO,console
输出到文件
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_exec.conf --name a1 -Dflume.root.logger=INFO,console
输出到hdfs
./bin/flume-ng agent --conf conf --conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
集群
故障转移
负载均衡
拦截与过滤
复制
复用
kafka
分布式的消息队列系统,同时提供数据分布式缓存功能(默认七天)
消息持久化到磁盘,预读后写,对磁盘的顺序访问
partition(一个或多个partition组成一个topic)
index log (索引信息)
message log (真是数据)
consumer group
sla-消息保留策略
交付保证
at least once: 至少一次 (会重复,但不丢失)
at most noce: 最多发送一次 (不重复,但可能丢失)
exactly once: 只有一次,客户端维护
集群
isr
follower 集合
实践
提前启动zookeeper
单机版
启动 ./bin/kafka-server-start.sh config/server.properties
查看topiclist ./bin/kafka-topics.sh --list --zookeeper localhost:2181
创建topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
查看topic描述 ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
自带发送数据 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
自带接收数据 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
删除topic ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
集群版
server.properties
broker.id 需要不一样
分别在slave1和slave2上开启进程
./bin/kafka-server-start.sh config/server.properties
创建多副本topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
offset偏移来查询消息
flume + kafka
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i1.headerName = key
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
storm + zookeeper
分布式锁服务
节点类型
persistent nodes
永久有效节点
ephemeral nodes
临时节点
persistent nodes + sequence nodes (顺序节点)
ephemeral nodes + sequence nodes (顺序节点)
监控
getData:数据是否被修改
getChildren:父节点下的子节点列表变化
exists:节点是否存在
应用场景
配置管理
集群管理
队列管理
FIFO
顺序节点
同步队列
顺序节点
实践
不同节点my.id不一样,指定集群地址
kazoo
storm
storm流式处理
缺点: 吞吐能力差
优点: 时效性好,毫秒级别,增量式处理
原语: spout和bolt
stream: 数据流
tuple: 最基本的数据单元
grouping: shuffle/fields
spout: 消息生产者
bolt: 消息处理逻辑
过滤,访问外部服务
模式
流式
持续计算
分布式rpc
架构
主:numbus
从:supervisor (监控工作)
worker (工作进程)
task: 线程
spout和bolt
executor进程,里面维护很多task,每次只会执行一个task
zookeeper 协调管理
容错
timeout
ack
实践
启动zookeeoer
启动
master:
python bin/storm nimbus &
python bin/storm ui &
python bin/storm logviewer &
slave:
python bin/storm supervisor &
python bin/storm logviewer &
监控页面:
http://192.168.87.10:8080/index.html
实时推荐
Flume+kafka+Storm+Hbase
可整合搭配
Flume+kafka+Storm+Hbase+Hive
spark streaming
流式处理框架
秒级
针对Dstream开发,处理结构以DstreamGraph形式展现
DStream的算子
transformation
output
执行算子 forEachRDD 对接外部服务
输出算子 saveAsTextFile 直接输出
架构
master 分配任务
worker 处理任务
client 输入数据
处理数据模式
1)recevier模式:被动——异步
优点:快
缺点:启动多executor
2)direct模式:主动——同步
优点:一个executor占用资源少
缺点:慢
容错
wal
给streamingContext设置checkpoint的目录,该目录必须是HADOOP支持的文件系统,用来保存WAL和做Streaming的checkpoint
spark.streaming.receiver.writeAheadLog.enable 设置为true
实践
kafka+streaming(kafka+storm)
数据积压:下游处理速度慢(并发不够、处理速度慢)
kafka -> streaming
1)数据分布,调节offset——紧急
2)并发调大,需要kafka配合(增加分区数),提高线程数量
3)控制批次的规模—— max.poll.records
4)控制数据处理时间(timeout)—— max.poll.interval.ms
spark sql
处理结构化数据
Dstream 模板
DataFrame 分布式table
读数据,基于列的内部优化
具备懒惰机制
HiveContext(hive)
SqlContext(hdfs)
Catalyst工作流程
基于规则优化
经验式优化思路
join算子
外排
大循环外排(不用)
游标式外排(归并)
内排
基于代价优化
评估每种策略选择的代价,确定代价最小的方案
代价估计模型-调整join的顺序,减少中间shuffle数据的模型
内存管理
tungsten内存管理器
实践
从hdfs的原始text中读数据
从hive中读数据
UDF
UDAF
streaming + sql + hbase
创建DataFrame
已有RDD
结构化数据文件
Json数据
Hive表
外部数据库
logserver
thrift-0.9.3
解压后进入根目录
安装依赖库(yum库)
yum install boost-devel-static libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev ant
安装c、c++源码包通常3步
1)./configure --with-cpp --with-boost --with-python --without-csharp --with-java \
--without-erlang --without-perl --without-php --without-php_extension --without-ruby --without-haskell --without-go
2)make
3)make install
pip install thrift==0.9.3
python -> c++ c++ -> python
搭建nginx服务器用来做分发器
nginx-1.14.0
公共网关接口(cgi)
模拟用户请求
ab压测
安装
yum install httpd-tools
命令
ab -c 20 -n 5000 'http://192.168.87.12/badou_recsys?userid=333&itemid=222&type=click&ip=1.0.0.10'
kmeans
排序算法
cf/cb
分类,回归
排序模型
LR,softmax,dnn,dt
聚类算法
硬聚类/软聚类
相似度计算
距离矩阵,相似度矩阵
评估方法
内部评估
外部评估
凝聚层次聚类
单链
全链
组合连
非层次聚类
先简单分类,然后不断计算聚类的中心
k平均
随机
多轮随机: 选择最小的wcss
适合球面类型
半个性化推荐
用户大,对用户进行聚类抽象出不同的向量类
实践
对文章进行聚类
DL
感知机(二分类)
深度学习的基本结构是深度神经网络
深度学习适合不可知域,若有领域知识的话,传统算法表现更好
pythorch
embedding
决策树
信息熵
id3
按信息收益最大的来作为根节点
c4.5
按信息增递率
如果树太深,模型容易过拟合
sklearn
模型融合(随机森林,GBDT)
多个模型融合
bagging
多个模型进行平均
并行学习
boosting
采样,对错样本加权采样
多个模型按照分类效果加权融合
串行学习
AdaBoost(自适应增强)
shrinkage(缩减)
bm25
搜索相关性评分
word2vec
霍夫曼树(最优二叉树)
MF
基于矩阵分解的推荐算法(隐语义模型(ALS,LFM,SVD,SVD++)
混合CF
ALS
交替最小二乘
坐标下降法
LFM
梯度下降法
SVD
带偏置的LFM
SVD++
增加用户对各个隐因子的喜好程度
ANN(多维空间检索算法)
图像/分类
annoy建树
CNN(卷积神经网络)
分类/面向自然语言处理/机器翻译
RNN(循环神经网络)
分类/面向自然语言处理,机器翻译
上一篇:gRPC go版本的初体验