Logstash导入数据库BLOB数据,配合ES做文档检索

准备工作

安装插件 logstash-filter-base64

1
2
3
4
5
elasticsearch-plugin install ingest-attachment

[root@localhost bin]# pwd
/root/logstash-6.5.0/bin
[root@localhost bin]# ./logstash-plugin install logstash-filter-base64

如下载很慢,需更新ruby后设置镜像源地址

1
2
参考网址 
https://gems.ruby-china.com/

ES 需做准备工作

创建索引并自定义映射(mapping) 这里和”8、对应”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
## 创建索引
PUT sirm_ir_report

## 创建映射
PUT /sirm_ir_report/reports/_mapping
{
"properties": {
"objid": {
"type": "long"
},
"sourceentity": {
"type": "text"
},
"sourceid": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"type": {
"type": "long"
},
"contentsize": {
"type": "long"
},
"filetype": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"createtimestamp": {
"type": "date"
},
"entityname": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"attachment": {
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word"
},
"content_type": {
"type": "text",
"analyzer": "ik_max_word"
},
"title": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
}

## attachment 为附件相关映射

## 验证映射是否创建成功
GET /sirm_ir_report/_mapping

创建自定义管道处理数据

查看已有管道

1
GET _ingest/pipeline

创建管道(chenliang-pipeline 为管道名称)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT _ingest/pipeline/chenliang-pipeline
{
"description": "chenliang-pipeline",
"processors": [
{
"attachment": {
"field": "data",
"indexed_chars": -1,
"properties": [
"content",
"title",
"content_type"
],
"ignore_missing": true
}
}
]
}

Logstash配置文件修改

修改配置

1
2
3
4
5
6
7
8
9
10
11
12
/root/logstash-6.5.0/config
[root@localhost config]# vim logstash.yml
.
.
.
## 添加管道名称
pipeline.id: chenliang-pipeline
.
.
.

## 保存退出

创建Logstash自定义配置文件

oracle.conf (名称随意,后缀必须是 .conf、ojdbc6.jar请自行下载)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
input {
jdbc {
# jdbc配置
jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521/orcl"
jdbc_driver_library => "/root/logstash-6.5.0/jars/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_user => "xxx"
jdbc_password => "xxx"
jdbc_page_size => 10
jdbc_paging_enabled => "true"
# 时间配置
jdbc_default_timezone => "Etc/UTC"
# 定时任务配置
schedule => "* * * * *"
# 配置增量更新
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => "true"
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => "true"
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "objid"
# ir_report_last_id 为自定义的文件名,存放每次查询后的最大id
last_run_metadata_path => "/root/logstash-6.5.0/config/ir_report_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => "false"
# sql配置
statement_filepath => "/root/logstash-6.5.0/config/ir_report.sql"
}
}

filter {
mutate {
# 移除字段
remove_field => "@version"
remove_field => "@timestamp"
}
# logstash-filter-base64插件、注意(data字段,对应数据库中的别名)
base64 {
field => "data"
action => "encode"
}
}

output {
elasticsearch {
# 索引名称(类似关系型数据库中的数据库名)
index => "sirm_ir_report"
# 文档类型(类似关系型数据库中某个表)
document_type => "reports"
# 文档编号(类似关系型数据库中某个表中id列)
document_id => "%{objid}"
# ES 地址
hosts => "192.168.122.129:9200"
# 管道名称
pipeline => "chenliang-pipeline"
}

stdout {
# 控制台输出方便监控结果
codec => json_lines
}
}

ir_report.sql

1
2
3
4
5
6
7
8
9
10
11
12
SELECT 
OBJID,SOURCEENTITY AS TITLE,
SOURCEENTITY,SOURCEID,NAME,
TYPE,CONTENTSIZE,FILETYPE,
CONTENT AS DATA ,
CREATETIMESTAMP,ENTITYNAME
FROM SIRM_ATTACHMENT WHERE SOURCEENTITY = 'REPORT'
AND SOURCEID = '4208'
AND OBJID > :sql_last_value

## sql_last_value 为 ir_report_last_id文件的值
## CONTENT AS DATA, 这里的data对应 conf文件里面的 base64-> "data"

如何启动

1
2
3
4
5
[root@localhost logstash-6.5.0]# cd bin/
[root@localhost bin]# ./logstash -f ../config/oracle.conf --path.data=/root/datas/

## -f 加载自定义的配置文件
## --path.data 数据存放路径(因为不能跟默认的管道存放在同一个路径)

问题汇总

CentOS 7 直接安装 gcc 7

1
2
3
4
5
sudo yum install centos-release-scl
sudo yum install devtoolset-7-gcc*
scl enable devtoolset-7 bash
which gcc
gcc --version

Ruby更新

1
2
3
4
5
6
7
8
$ wget http://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.5.tar.gz
$ tar zxvf ruby-2.3.5.tar.gz
$ cd ruby-2.3.5
$ ./configure --prefix=/opt/ruby
$ make && make install
$ ln -s /opt/ruby/bin/ruby /usr/bin/ruby
$ ln -s /opt/ruby/bin/gem /usr/bin/gem
$ ruby -v