otter实现跨机房云边协同
1.otter介绍1.1otter是什么?Ottter是由阿里开源的一个数据同步产品,它的最初的目的是为了解决跨国异地机房双A架构,两边可写的场景,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了Otter。Otter基于数据库增量日志解析,支持mysql/oracle数据库进行同步,在最新的v4.2.13已经支持mysql5.7以及阿里云提供的
1.otter介绍
1.1otter是什么?
Ottter是由阿里开源的一个数据同步产品,它的最初的目的是为了解决跨国异地机房双A架构,两边可写的场景,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了Otter。
Otter基于数据库增量日志解析,支持mysql/oracle数据库进行同步,在最新的v4.2.13已经支持mysql5.7以及阿里云提供的RDS数据库(使用RDS童鞋的福音)
1.2otter工作原理
- List item
- List item
- db : 数据源以及需要同步到的库
- Canal : 用户获取数据库增量日志
- manager : 配置同步规则设置数据源同步源等
- zookeeper : 协调node进行协调工作
- node : 负责任务处理处理接受到的部分同步工作
1.3otter的特性
- 使用纯JAVA开发,占时资源比较高
- 基于Canal获取数据库增量日志,Canal是阿里爸爸另外一个开源产品
- 使用manager(web管理)+node(工作节点),manager负责配置监控,node负责处理任务
- 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作
- 使用aria2多线程传输技术,对网络依赖带宽依赖较低
1.4otter能解决什么问题
1、异构库同步
Otter支持从Mysql同步到Mysql/oracle,我们可以把mysql同步到oracle
2、单机房同步
可以作为一主多从同步方案,对于单机房内网来说效率非常高,还可以做为数据库版本升级,数据表迁移,二级索引等这类功能
3、异地机房同步
异地机房同步可以说是Otter最大的亮点之一,可以解决国际化问题把数据从国内同步到国外来提供用户使用,在国内场景可以做到数据多机房容灾
4、双向同步
双向同步是在数据同步中最难搞的一种场景,Otter可以很好的应对这种场景,Otter有避免回环算法和数据一致性算法两种特性,保证双A机房模式下,数据保证最终一致性
5、文件同步
站点镜像,进行数据复制的同时,复制关联的图片,比如复制产品数据,同时复制产品图片
1.5安装otter
具体安装参考:https://blog.csdn.net/dong_007_007/article/details/78643159
2.Beautiful Soup 的简介
简单来说,Beautiful Soup 是 python 的一个库,最主要的功能是从网页抓取数据。官方解释如下:
Beautiful Soup 提供一些简单的、python 式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具箱,通过解析文档为用户提供需要抓取的数据,因为简单,所以不需要多少代码就可以写出一个完整的应用程序。 Beautiful Soup 自动将输入文档转换为 Unicode 编码,输出文档转换为 utf-8 编码。你不需要考虑编码方式,除非文档没有指定一个编码方式,这时,Beautiful Soup 就不能自动识别编码方式了。然后,你仅仅需要说明一下原始编码方式就可以了。 Beautiful Soup 已成为和 lxml、html6lib 一样出色的 python 解释器,为用户灵活地提供不同的解析策略或强劲的速度。
2.1Beautiful Soup 安装
Beautiful Soup 3 目前已经停止开发,推荐在现在的项目中使用 Beautiful Soup 4,不过它已经被移植到 BS4 了,也就是说导入时我们需要 import bs4 。所以这里我们用的版本是 Beautiful Soup 4.3.2 (简称 BS4),另外据说 BS4 对 Python3 的支持不够好,不过我用的是 Python2.7.7,如果有小伙伴用的是 Python3 版本,可以考虑下载 BS3 版本。 可以利用 pip 或者 easy_install 来安装,以下两种方法均可
pip install beautifulsoup4
Beautiful Soup 支持 Python 标准库中的 HTML 解析器,还支持一些第三方的解析器,如果我们不安装它,则 Python 会使用 Python 默认的解析器,lxml 解析器更加强大,速度更快,推荐安装。
2.2Beautiful Soup 使用
首先必须要导入 bs4 库
from bs4 import BeautifulSoup
我们创建一个字符串,后面的例子我们便会用它来演示
html = """
<html><head><title>The Dormouse's story</title></head>
<body>
<p class="title" name="dromouse"><b>The Dormouse's story</b></p>
<p class="story">Once upon a time there were three little sisters; and their names were
<a href="http://example.com/elsie" class="sister" id="link1"><!-- Elsie --></a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> and
<a href="http://example.com/tillie" class="sister" id="link3">Tillie</a>;
and they lived at the bottom of a well.</p>
<p class="story">...</p>
"""
创建 beautifulsoup 对象
soup = BeautifulSoup(html)
另外,我们还可以用本地 HTML 文件来创建对象,例如
soup = BeautifulSoup(open('index.html'))
上面这句代码便是将本地 index.html 文件打开,用它来创建 soup 对象 下面我们来打印一下 soup 对象的内容,格式化输出
print soup.prettify()
<html>
<head>
<title>
The Dormouse's story
</title>
</head>
<body>
<p class="title" name="dromouse">
<b>
The Dormouse's story
</b>
</p>
<p class="story">
Once upon a time there were three little sisters; and their names were
<a class="sister" href="http://example.com/elsie" id="link1">
<!-- Elsie -->
</a>
,
<a class="sister" href="http://example.com/lacie" id="link2">
Lacie
</a>
and
<a class="sister" href="http://example.com/tillie" id="link3">
Tillie
</a>
;
and they lived at the bottom of a well.
</p>
<p class="story">
...
</p>
</body>
</html>
以上便是输出结果,格式化打印出了它的内容,这个函数经常用到,小伙伴们要记好咯。
四大对象种类
Beautiful Soup 将复杂 HTML 文档转换成一个复杂的树形结构,每个节点都是 Python 对象,所有对象可以归纳为 4 种:
- Tag
- NavigableString
- BeautifulSoup
- Comment
(1)Tag
Tag 是什么?通俗点讲就是 HTML 中的一个个标签,例如
<title>The Dormouse's story</title>
<a class="sister" href="http://example.com/elsie" id="link1">Elsie</a>
上面的 title a 等等 HTML 标签加上里面包括的内容就是 Tag,下面我们来感受一下怎样用 Beautiful Soup 来方便地获取 Tags 下面每一段代码中注释部分即为运行结果
print soup.title
#<title>The Dormouse's story</title>
print soup.head
#<head><title>The Dormouse's story</title></head>
print soup.a
#<a class="sister" href="http://example.com/elsie" id="link1"><!-- Elsie --></a>
print soup.p
#<p class="title" name="dromouse"><b>The Dormouse's story</b></p>
我们可以利用 soup 加标签名轻松地获取这些标签的内容,是不是感觉比正则表达式方便多了?不过有一点是,它查找的是在所有内容中的第一个符合要求的标签,如果要查询所有的标签,我们在后面进行介绍。 我们可以验证一下这些对象的类型
print type(soup.a)
#<class 'bs4.element.Tag'>
对于 Tag,它有两个重要的属性,是 name 和 attrs,下面我们分别来感受一下 name
print soup.name
print soup.head.name
#[document]
#head
soup 对象本身比较特殊,它的 name 即为 [document],对于其他内部标签,输出的值便为标签本身的名称。 attrs
print soup.p.attrs
#{'class': ['title'], 'name': 'dromouse'}
在这里,我们把 p 标签的所有属性打印输出了出来,得到的类型是一个字典。 如果我们想要单独获取某个属性,可以这样,例如我们获取它的 class 叫什么
print soup.p['class']
#['title']
还可以这样,利用 get 方法,传入属性的名称,二者是等价的
print soup.p.get('class')
#['title']
我们可以对这些属性和内容等等进行修改,例如
soup.p['class']="newClass"
print soup.p
#<p class="newClass" name="dromouse"><b>The Dormouse's story</b></p>
还可以对这个属性进行删除,例如
del soup.p['class']
print soup.p
#<p name="dromouse"><b>The Dormouse's story</b></p>
不过,对于修改删除的操作,不是我们的主要用途,在此不做详细介绍了,如果有需要,请查看前面提供的官方文档
(2)NavigableString
既然我们已经得到了标签的内容,那么问题来了,我们要想获取标签内部的文字怎么办呢?很简单,用 .string 即可,例如
print soup.p.string
#The Dormouse's story
这样我们就轻松获取到了标签里面的内容,想想如果用正则表达式要多麻烦。它的类型是一个 NavigableString,翻译过来叫 可以遍历的字符串,不过我们最好还是称它英文名字吧。 来检查一下它的类型
print type(soup.p.string)
#<class 'bs4.element.NavigableString'>
(3)BeautifulSoup
BeautifulSoup 对象表示的是一个文档的全部内容。大部分时候,可以把它当作 Tag 对象,是一个特殊的 Tag,我们可以分别获取它的类型,名称,以及属性来感受一下
print type(soup.name)
#<type 'unicode'>
print soup.name
# [document]
print soup.attrs
#{} 空字典
(4)Comment
Comment 对象是一个特殊类型的 NavigableString 对象,其实输出的内容仍然不包括注释符号,但是如果不好好处理它,可能会对我们的文本处理造成意想不到的麻烦。 我们找一个带注释的标签
print soup.a
print soup.a.string
print type(soup.a.string)
运行结果如下
<a class="sister" href="http://example.com/elsie" id="link1"><!-- Elsie --></a>
Elsie
<class 'bs4.element.Comment'>
a 标签里的内容实际上是注释,但是如果我们利用 .string 来输出它的内容,我们发现它已经把注释符号去掉了,所以这可能会给我们带来不必要的麻烦。 另外我们打印输出下它的类型,发现它是一个 Comment 类型,所以,我们在使用前最好做一下判断,判断代码如下
if type(soup.a.string)==bs4.element.Comment:
print soup.a.string
上面的代码中,我们首先判断了它的类型,是否为 Comment 类型,然后再进行其他操作,如打印输出。
上面是一些常用的简单操作,如果需要详情,请参考:https://cuiqingcai.com/1319.html
argparse介绍
argparse 是python自带的命令行参数解析包,可以用来方便地读取命令行参数,当你的代码需要频繁地修改参数的时候,使用这个工具可以将参数和代码分离开来,让你的代码更简洁,适用范围更广。
argparse使用比较简单,常用的功能可能较快地实现出来,下面我分几个步骤,以Python3为例,逐渐递增地讲述argparse的用法。
1.基本框架
下面是使用argparse从命令行获取用户名,然后打印’Hello ‘+ 用户名,假设python文件名为print_name.py:
# file-name:print_name.py
import argparse
def get_parser():
parser = argparse.ArgumentParser(description="Demo of argparse")
parser.add_argument('--name', default='Great')
return parser
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
name = args.name
print('Hello {}'.format(name))
在命令行执行如下命令:
$ python print_name.py --name Wang
Hello Wang
上面的代码段中,我们显示引入了argparse包,然后通过argparse.ArgumentParser函数生成argparse对象,其中这个函数的description函数表示在命令行显示帮助信息的时候,这个程序的描述信息。之后我们通过对象的add_argument函数来增加参数。这里我们只增加了一个–name的参数,然后后面的default参数表示如果没提供参数,我们默认采用的值。即如果像下面这样执行命令:
$ python print_name.py
则输出是:
$ Hello Great
最后我们通过argpaser对象的parser_args函数来获取所有参数args,然后通过args.name的方式得到我们设置的–name参数的值,可以看到这里argparse默认的参数名就是–name形式里面–后面的字符串。
整个流程就是这样,下面我们详细讲解add_argument函数的一些最常用的参数,使得你看完这个教程之后,能完成科研和工作中的大部分命令解析任务。
2. default:没有设置值情况下的默认参数
如同上例中展示的,default表示命令行没有设置该参数的时候,程序中用什么值来代替。
3. required: 表示这个参数是否一定需要设置
如果设置了required=True,则在实际运行的时候不设置该参数将报错:
parser.add_argument('-name', required=True)
则运行下面的命令会报错:
$ python print_name.py
usage: print_name.py [-h] --name NAME
print_name.py: error: argument --name is required
4. type:参数类型
默认的参数类型是str类型,如果你的程序需要一个整数或者布尔型参数,你需要设置type=int或type=bool,下面是一个打印平方的例子:
#name: square.py
import argparse
def get_parser():
parser = argparse.ArgumentParser(
description='Calculate square of a given number')
parser.add_argument('-number', type=int)
return parser
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
res = args.number ** 2
print('square of {} is {}'.format(args.number, res))
执行:
$ python square.py -number 5
square of 5 is 25
5. choices:参数值只能从几个选项里面选择
如下面的代码:
# file-name: choices.py
import argparse
def get_parser():
parser = argparse.ArgumentParser(
description='choices demo')
parser.add_argument('-arch', required=True, choices=['alexnet', 'vgg'])
return parser
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
print('the arch of CNN is '.format(args.arch))
如果像下面这样执行会报错:
$ python choices.py -arch resnet
usage: choices.py [-h] -arch {alexnet,vgg}
choices.py: error: argument -arch: invalid choice: 'resnet' (choose from 'alexnet', 'vgg')
因为我们所给的-arch参数resnet不在备选的choices之中,所以会报错
6. help:指定参数的说明信息
在现实帮助信息的时候,help参数的值可以给使用工具的人提供该参数是用来设置什么的说明,对于大型的项目,help参数和很有必要的,不然使用者不太明白每个参数的含义,增大了使用难度。
下面是个例子:
# file-name: help.py
import argparse
def get_parser():
parser = argparse.ArgumentParser(
description='help demo')
parser.add_argument('-arch', required=True, choices=['alexnet', 'vgg'],
help='the architecture of CNN, at this time we only support alexnet and vgg.')
return parser
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
print('the arch of CNN is '.format(args.arch))
在命令行加-h或–help参数运行该命令,获取帮助信息的时候,结果如下:
$ python help.py -h
usage: help.py [-h] -arch {alexnet,vgg}
choices demo
optional arguments:
-h, --help show this help message and exit
-arch {alexnet,vgg} the architecture of CNN, at this time we only support
alexnet and vgg.
7. dest:设置参数在代码中的变量名
argparse默认的变量名是–或-后面的字符串,但是你也可以通过dest=xxx来设置参数的变量名,然后在代码中用args.xxx来获取参数的值。
8. nargs: 设置参数在使用可以提供的个数
使用方式如下:
parser.add_argument('-name', nargs=x)
其中x的候选值和含义如下:
值 含义
N 参数的绝对个数(例如:3)
'?' 0或1个参数
'*' 0或所有参数
'+' 所有,并且至少一个参数
如下例子:
# file-name: nargs.py
import argparse
def get_parser():
parser = argparse.ArgumentParser(
description='nargs demo')
parser.add_argument('-name', required=True, nargs='+')
return parser
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
names = ', '.join(args.name)
print('Hello to {}'.format(names))
执行命令和结果如下:
$ python nargs.py -name A B C
Hello to A, B, C
详情可参考:
http://blog.xiayf.cn/2013/03/30/argparse/
https://docs.python.org/3/library/argparse.html
最后笔者把这几个功能实现的otter同步代码给大家展示一下,希望对每个人都有帮助
#otter.yaml
ORIGIN_USER: root
ORIGIN_PASSWORD: 123456
ORIGIN_PORT: 3306
TARGET_USER: root
TARGET_PASSWORD: 123456
TARGET_PORT: 3306
ORIGIN_DATABASE: keystone
TARGET_DATABASE: keystone
ENDPOINT: http://10.121.118.26:30113
# encoding=utf-8
#!/usr/bin/env python
#sync-otter.py
import argparse
import logging
import re
import requests
import yaml
from bs4 import BeautifulSoup
logging.basicConfig(format='%(message)s', level=logging.INFO)
class otter_sync():
def __init__(self):
self._csrf_token = None
self.headers = {
'Cookie': None,
}
self.origin_id = None
self.target_id = None
self.data_origin_table_id = None
self.data_target_table_id = None
self.canal_id = None
self.channel_id = None
self.pipeline_id = None
self.ip_canal = None
self.name = None
self.origin_name = None
self.target_name = None
self.canal_name = None
self.channel_name = None
self.pipeline_name = None
self.origin_user = None
self.origin_password = None
self.origin_url = None
self.target_user = None
self.target_password = None
self.target_url = None
self.origin_database = None
self.target_database = None
self.endpoint = None
self.html_table_unify = []
self.otter_tables = {}
def otter_sync(self):
self.disponse_otter_yaml()
self.dispose_name()
# self.add_zookper(IP_ZOOKEEPER='10.121.121.241:3306;')
# self.add_node(IP_NODE='127.0.0.1', PORT_NODE=2089)
self.otter_login(url=self.endpoint + '/login.htm')
self.otter_csrf(url=self.endpoint + '/addDataSource.htm')
self.add_data_origin(name=self.name + '_origin', mysql_user=self.origin_user,
mysql_password=self.origin_password,
data_url=self.origin_url,
url=self.endpoint + '/addDataSource.htm')
self.dispose_id(name=self.name + '_origin', url=self.endpoint + '/data_source_list.htm',
is_data_origin=True, data_source=True)
self.add_data_origin(name=self.name + '_target', mysql_user=self.target_user,
mysql_password=self.target_password,
data_url=self.target_url,
url=self.endpoint + '/addDataSource.htm')
self.dispose_id(name=self.name + '_target', url=self.endpoint + '/data_source_list.htm',
is_data_target=True, data_source=True)
self.add_data_table(origin_database=self.origin_database, source_name=self.origin_name,
origin_id=self.origin_id, url=self.endpoint + '/addDataMedia.htm')
self.dispose_id(origin_database_name=self.origin_database, origin_name=self.origin_name,
url=self.endpoint + '/data_media_list.htm',
is_data_origin_table=True, is_data_table=True)
self.add_data_table(origin_database=self.target_database, source_name=self.target_name,
origin_id=self.target_id, url=self.endpoint + '/addDataMedia.htm')
self.dispose_id(target_database_name=self.target_database, target_name=self.target_name,
url=self.endpoint + '/data_media_list.htm',
is_data_target_table=True, is_data_table=True)
self.add_canal(name=self.name + '_canal', ip_canal=self.ip_canal, mysql_user=self.origin_user,
mysql_password=self.origin_password,
url=self.endpoint + '/add_canal.htm')
self.add_channel(name=self.name + '_channel', url=self.endpoint + '/add_channel.htm')
self.dispose_id(url=self.endpoint + '/channel_list.htm', is_channel=True)
self.add_pipeline(name=self.name + '_pipeline',
url=self.endpoint + '/add_pipeline.htm?channelId=' + self.channel_id)
self.dispose_id(url=self.endpoint + '/pipelineList.htm?channelId=' + self.channel_id,
is_pipeline=True)
self.add_data_media_pair(
url=self.endpoint + '/add_data_media_pair.htm?pipelineId=' + self.pipeline_id)
self.start_sync_task(
url=self.endpoint + '/?action=channelAction&channelId=' + self.channel_id + '&status=start&pageIndex=1&searchKey=&eventSubmitDoStatus=true')
def get_parser(self):
parser = argparse.ArgumentParser(description="Gets parameters for otter synchronization")
parser.add_argument('--ORIGIN_USER', type=str, default='root',
help='参数表示源数据库的用户名,默认为root.')
parser.add_argument('--ORIGIN_PASSWORD', type=int,
help='参数表示源数据库的密码.')
parser.add_argument('--ORIGIN_IP', type=str,
help='参数表示源数据库IP,如:127.0.0.1.')
parser.add_argument('--ORIGIN_PORT', type=str,
help='参数表示源数据库PORT,如:3306.')
parser.add_argument('--TARGET_USER', type=str, default='root',
help='参数表示源数据库的用户名,默认为root.')
parser.add_argument('--TARGET_PASSWORD', type=int,
help='参数表示目标数据库的密码.')
parser.add_argument('--TARGET_IP', type=str,
help='参数表示目标数据库IP,如:127.0.0.1.')
parser.add_argument('--TARGET_PORT', type=str,
help='参数表示目标数据库PORT,如:3306.')
parser.add_argument('--ORIGIN_DATABASE', type=str, default='keystone',
help='参数表示源数据库的名称,默认为keystone.')
parser.add_argument('--TARGET_DATABASE', type=str, default='keystone',
help='参数表示目标数据库的名称,默认为keystone.')
parser.add_argument('--ENDPOINT', type=str,
help='该参数表示终端的IP地址,例如http://127.0.0.1:30113.')
return parser
def otter_login(self, url=None):
data = {
'_fm.l._0.n': 'admin',
'_fm.l._0.p': 'admin',
'action': 'user_action',
'event_submit_do_login': 1
}
#
resp = requests.post(url=url, data=data)
if not resp.status_code == 200:
raise Exception(resp.text)
if not resp.history[0].headers['Set-Cookie']:
raise Exception('The token is not obtained. Procedure.')
JSESSIONID = resp.history[0].headers['Set-Cookie'].split(';')
OTTER_WEBX_JSESSIONID0 = JSESSIONID[2].split(',')
Set_Cookie = JSESSIONID[0] + ';' + OTTER_WEBX_JSESSIONID0[1]
self.headers['Cookie'] = Set_Cookie
if self.headers.get('Cookie') == None:
raise Exception('No Cookie was obtained.')
def otter_csrf(self, url=None):
resp = requests.get(url=url, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
html = resp.text
csrf_token = BeautifulSoup(html)
csrf_token.input['self.name'] = '_csrf_token'
res = csrf_token.input
self._csrf_token = res['value']
if self._csrf_token == None:
raise Exception('The CSRF token is not obtained. Procedure.')
def disponse_otter_yaml(self):
with open('otter.yaml', 'r') as f:
otter_yaml = f.read()
configuration_data = yaml.load(otter_yaml)
regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https:// or jdc:mysql://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
regex_ip = re.compile(
r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$', re.IGNORECASE) # ...or ip
if 'ORIGIN_USER' not in configuration_data or 'ORIGIN_PASSWORD' not in configuration_data \
or 'ORIGIN_PORT' not in configuration_data or 'TARGET_USER' not in configuration_data or \
'TARGET_PASSWORD' not in configuration_data or 'TARGET_PORT' not in configuration_data or \
'ORIGIN_DATABASE' not in configuration_data or 'TARGET_DATABASE' not in configuration_data \
or 'ENDPOINT' not in configuration_data:
raise Exception('''The configuration item variable is changed. Please refer to the template to fill in:
{
"ORIGIN_USER":"root",
"ORIGIN_PASSWORD":123456,
"ORIGIN_PORT":3306,
"TARGET_USER":"root",
"TARGET_PASSWORD":123456,
"TARGET_PORT":3306,
"ORIGIN_DATABASE":"default",
"TARGET_DATABASE":"default",
"ENDPOINT":"http://127.0.0.1:30113"
}''')
parser = self.get_parser()
args = parser.parse_args()
if args.ORIGIN_USER:
self.origin_user = args.ORIGIN_USER
elif configuration_data['ORIGIN_USER'] != None:
self.origin_user = configuration_data['ORIGIN_USER']
else:
raise Exception(
'The ORIGIN_USER parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.ORIGIN_PASSWORD:
self.origin_password = args.ORIGIN_PASSWORD
elif configuration_data['ORIGIN_PASSWORD'] != None:
self.origin_password = configuration_data['ORIGIN_PASSWORD']
else:
raise Exception(
'The ORIGIN_PASSWORD parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.ORIGIN_IP and not regex_ip.match(args.ORIGIN_IP):
raise Exception(
'The ORIGIN IP parameter format is incorrect, for example:127.0.0.1')
elif args.ORIGIN_IP and regex_ip.match(args.ORIGIN_IP) and args.ORIGIN_PORT:
self.origin_url = 'jdbc:mysql://' + args.ORIGIN_IP + ':' + args.ORIGIN_PORT
self.ip_canal = args.ORIGIN_IP + ':' + args.ORIGIN_PORT + ';'
elif args.ORIGIN_IP and regex_ip.match(args.ORIGIN_IP) and configuration_data['ORIGIN_PORT'] != None:
self.origin_url = 'jdbc:mysql://' + args.ORIGIN_IP + ':' + str(configuration_data['ORIGIN_PORT'])
self.ip_canal = args.ORIGIN_IP + ':' + str(configuration_data['ORIGIN_PORT']) + ';'
else:
raise Exception(
'The ORIGIN PORT parameter was not found or the ORIGIN IP format is incorrect, for example, 127.0.0.1')
if args.TARGET_USER:
self.target_user = args.TARGET_USER
elif configuration_data['TARGET_USER'] != None:
self.target_user = configuration_data['TARGET_USER']
else:
raise Exception(
'The TARGET_USER parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.TARGET_PASSWORD:
self.target_password = args.TARGET_PASSWORD
elif configuration_data['TARGET_PASSWORD'] != None:
self.target_password = configuration_data['TARGET_PASSWORD']
else:
raise Exception(
'The TARGET_PASSWORD parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.TARGET_IP and not regex_ip.match(args.TARGET_IP):
raise Exception(
'The format of the TARGET IP parameter is incorrect, for example:127.0.0.1')
elif args.TARGET_IP and regex_ip.match(args.TARGET_IP) and args.TARGET_PORT:
self.target_url = 'jdbc:mysql://' + args.TARGET_IP + ':' + args.TARGET_PORT
elif args.TARGET_IP and regex_ip.match(args.TARGET_IP) and configuration_data['TARGET_PORT'] != None:
self.target_url = 'jdbc:mysql://' + args.TARGET_IP + ':' + str(configuration_data['TARGET_PORT'])
else:
raise Exception(
'The TARGET PORT parameter was not found or the TARGET IP format is incorrect, such as 127.0.0.1')
if args.ORIGIN_DATABASE:
self.origin_database = args.ORIGIN_DATABASE
elif configuration_data['ORIGIN_DATABASE'] != None:
self.origin_database = configuration_data['ORIGIN_DATABASE']
else:
raise Exception(
'The ORIGIN_DATABASE parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.TARGET_DATABASE:
self.target_database = args.TARGET_DATABASE
elif configuration_data['TARGET_DATABASE'] != None:
self.target_database = configuration_data['TARGET_DATABASE']
else:
raise Exception(
'The TARGET_DATABASE parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')
if args.ENDPOINT and not regex.match(args.ENDPOINT):
raise Exception(
'The format of the ENDPOINT parameter is incorrect, for example:http://127.0.0.1:8081')
if args.ENDPOINT and regex.match(args.ENDPOINT):
self.endpoint = args.ENDPOINT
elif configuration_data['ENDPOINT'] != None and regex.match(
configuration_data['ENDPOINT']):
self.endpoint = configuration_data['ENDPOINT']
else:
raise Exception(
'No ENDPOINT parameter found or format error, for example:http://127.0.0.1:8081')
def dispose_name(self):
master_url = self.origin_url.split(':')
slave_url = self.target_url.split(':')
origin_url_name = master_url[2]
target_url_name = slave_url[2]
self.name = origin_url_name.split('.')[3] + '_' + target_url_name.split('.')[3]
def dispose_id(self, name=None, url=None, origin_database_name=None, origin_name=None,
target_database_name=None, target_name=None, is_data_origin=False, data_source=False,
is_data_table=False, is_canal=False, is_channel=False, is_pipeline=False, is_data_target=False,
is_data_origin_table=False, is_data_target_table=False):
res = requests.get(url=url, headers=self.headers)
html = res.text
soup = BeautifulSoup(html)
datas_table_origin = soup.table
datas_table = []
for data in datas_table_origin.stripped_strings:
datas_table.append(data)
if data_source or is_canal:
self.html_table_unify = [i for i in datas_table[6:] if
(i != '查看') and (i != '|') and (i != '编辑') and (i != '删除')]
self.otter_tables = {
'dispose_datas': [self.html_table_unify[i:i + 5] for i in
range(0, len(self.html_table_unify), 5)],
'data_key': ['id', 'name', 'database', 'coding', 'url']}
elif is_data_table:
self.html_table_unify = [i for i in datas_table[6:] if
(i != '查看') and (i != '|') and (i != '编辑') and (i != '删除')]
self.otter_tables = {
'dispose_datas': [self.html_table_unify[i:i + 5] for i in
range(0, len(self.html_table_unify), 5)],
'data_key': ['id', 'name', 'table', 'source_name', 'coding']}
elif is_channel:
self.html_table_unify = [i for i in datas_table[5:] if
(i != '查看') and (i != '|') and (i != '编辑') and (i != '删除') and (i != '停止') and (
i != '无') and (
i != '启用') and (
i != '推送') and (i != '单向')]
self.otter_tables = {
'dispose_datas': [self.html_table_unify[i:i + 2] for i in range(0, len(self.html_table_unify), 2)],
'data_key': ['id', 'name']}
elif is_pipeline:
self.html_table_unify = [i for i in datas_table[10:] if
(i != '查看') and (i != '|') and (i != '编辑') and (i != '删除') and (i != '监控') and (
i != '日志')]
self.otter_tables = {
'dispose_datas': [self.html_table_unify[i:i + 7] for i in range(0, len(self.html_table_unify), 7)],
'data_key': ['id', 'name', 'parallel_data', 'home_site', 'mainstem_state', 'delay_time',
'monitoring_num']}
data_key = self.otter_tables.get('data_key', '')
data_origin = []
for data_value in self.otter_tables.get('dispose_datas'):
data_origin.append(dict(zip(data_key, data_value)))
for data in data_origin:
if name == data.get('name', None) and is_data_origin:
self.origin_id = data.get('id', None)
self.origin_name = name
if self.origin_id == None:
raise Exception('The data source ID was not obtained. Procedure')
elif name == data.get('name', None) and is_data_target:
self.target_id = data.get('id', None)
self.target_name = name
if self.target_id == None:
raise Exception('The target ID was not obtained.')
elif origin_database_name == data.get('name', None) and origin_name == data.get('source_name',
None) and is_data_origin_table:
self.data_origin_table_id = data.get('id', None)
if self.data_origin_table_id == None:
raise Exception('The DATA Origin table ID was not found.')
elif target_database_name == data.get('name', None) and target_name == data.get('source_name',
None) and is_data_target_table:
self.data_target_table_id = data.get('id', None)
if self.data_target_table_id == None:
raise Exception('The DATA Target table ID was not found.')
elif data.get('name', None) == self.canal_name and is_canal:
self.canal_id = data.get('id', None)
if self.canal_id == None:
raise Exception('The Canal ID was not obtained.')
elif self.channel_name == data.get('name', None) and is_channel:
self.channel_id = data.get('id', None)
if self.channel_id == None:
raise Exception('The Channel ID was not obtained')
elif self.pipeline_name == data.get('name', None) and is_pipeline:
self.pipeline_id = data.get('id', None)
if self.pipeline_id == None:
raise Exception('Pipeline ID not obtained')
# def add_zookper(self, IP_ZOOKEEPER=None):
# url = 'http://10.121.118.26:30113/add_zookeeper.htm'
# data = {
# '_csrf_token': self._csrf_token,
# 'action': 'auto_keeper_cluster_action',
# 'event_submit_do_add': 1,
# '_fm.a._0.c': 'kevin222',
# '_fm.a._0.z': IP_ZOOKEEPER,
# '_fm.a._0.d': 'add zookeeper.',
#
# }
# resp = requests.post(url=url, data=data, headers=self.headers)
# if not resp.status_code == 200:
# # raise Exception('An internal error occurs on the server when Zookeeper is added')
# raise Exception(resp.text)
#
# def add_node(self, IP_NODE=None, PORT_NODE=None):
# url = 'http://10.121.118.26:30113/add_node.htm'
# data = {
# '_csrf_token': self._csrf_token,
# 'action': 'node_action',
# 'event_submit_do_add': 1,
# '_fm.n._0.n': 'kevin',
# '_fm.n._0.ip': IP_NODE,
# '_fm.n._0.p': PORT_NODE,
# '_fm.no._0.u': False,
# '_fm.no._0.a': 1,
# '_fm.n._0.d': 'add node.'
#
# }
# resp = requests.post(url=url, data=data, headers=self.headers)
# if not resp.status_code == 200:
# raise Exception(resp.text)
def add_data_origin(self, name=None, mysql_user=None, mysql_password=None, data_url=None, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'data_media_source_action',
'event_submit_do_add': 1,
'_fm.d._0.n': name,
'_fm.d._0.t': 'MYSQL',
'_fm.d._0.u': mysql_user,
'_fm.d._0.p': mysql_password,
'_fm.d._0.ur': data_url,
'_fm.d._0.e': 'UTF8'
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
def add_data_table(self, origin_database=None, source_name=None, origin_id=None, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'data_media_action',
'event_submit_do_add': 1,
'_fm.da._0.na': origin_database,
'_fm.da._0.n': '(.*).(.*)',
'_fm.da._0.s': source_name,
'_fm.da._0.so': origin_id
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
def add_canal(self, name=None, ip_canal=None, mysql_user=None, mysql_password=None, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'canal_action',
'event_submit_do_add': 1,
'_fm.ca._0.n': name,
'_fm.can._0.r': 'EMBEDDED',
'_fm.can._0.a': 1,
'_fm.can._0.s': 'MYSQL',
'_fm.can._0.g': ip_canal,
'_fm.can._0.d': mysql_user,
'_fm.can._0.db': mysql_password,
'_fm.can._0.co': 'UTF-8',
'_fm.can._0.gt': False,
'_fm.can._0.t': False,
'_fm.can._0.st': 'MEMORY',
'_fm.can._0.sto': 'MEMSIZE',
'_fm.can._0.me': 32768,
'_fm.can._0.mem': 1024,
'_fm.can._0.h': 'HEARTBEAT',
'_fm.can._0.det': False,
'_fm.can._0.dete': 'insert into retl.xdual values(1,now()) on duplicate key update x=now()',
'_fm.can._0.detec': 5,
'_fm.can._0.detect': 30,
'_fm.can._0.detecti': 3,
'_fm.can._0.he': False,
'_fm.can._0.m': 'MIXED',
'_fm.can._0.in': 'MEMORY_META_FAILBACK',
'_fm.can._0.po': 11111,
'_fm.can._0.de': 30,
'_fm.can._0.re': 16384,
'_fm.can._0.se': 16384,
'_fm.can._0.fa': 60,
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
self.canal_name = name
def add_channel(self, name=None, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'channel_action',
'event_submit_do_add': 1,
'_fm.ch._0.n': name,
'_fm.cha._0.sy': 'BASE',
'_fm.cha._0.s': 'ROW',
'_fm.cha._0.e': True,
'_fm.cha._0.re': 'LOOPBACK',
'_fm.cha._0.r': 60,
'_fm.ch._0.d': 'add channel.'
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
self.channel_name = name
def add_pipeline(self, name=None, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'pipeline_action',
'event_submit_do_add': 1,
'_fm.p._0.i': '$pipeline.id',
'_fm.p._0.c': self.channel_id,
'channelId': self.channel_id,
'_fm.p._0.n': name,
'_fm.p._0.s': 1,
'_fm.p._0.l': 1,
'_fm.pi._0.pa': 5,
'_fm.pi._0.e': 10,
'_fm.pi._0.l': 15,
'_fm.pi._0.f': 15,
'_fm.pi._0.h': False,
'_fm.pi._0.se': 'Canal',
'_fm.pi._0.de': self.canal_name,
'_fm.pi._0.m': 6000,
'_fm.pi._0.b': -1,
'_fm.p._0.d': 'add pipeline.'
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
self.pipeline_name = name
def add_data_media_pair(self, url=None):
data = {
'_csrf_token': self._csrf_token,
'action': 'data_media_pair_action',
'event_submit_do_add': 1,
'_fm.dat._0.pi': self.pipeline_id,
'pipelineId': self.pipeline_id,
'channelId': self.channel_id,
'_fm.dat._0.s': '(.*).(.*)',
'_fm.dat._0.so': self.data_origin_table_id,
'_fm.dat._0.t': '(.*).(.*)',
'_fm.dat._0.ta': self.data_target_table_id,
'_fm.dat._0.pu': 5,
'_fm.dat._0.c': 'INCLUDE',
'_fm.dat._0.fi': 'CLAZZ',
'_fm.dat._0.r': 'CLAZZ',
'submitKey': '保存'
}
resp = requests.post(url=url, data=data, headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
def start_sync_task(self, url=None):
resp = requests.get(url=url,
headers=self.headers)
if not resp.status_code == 200:
raise Exception(resp.text)
logging.info(">>>>>>>>>>>>脚本运行成功")
if __name__ == '__main__':
task = otter_sync()
task.otter_sync()
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)