1.otter介绍

1.1otter是什么?

Ottter是由阿里开源的一个数据同步产品,它的最初的目的是为了解决跨国异地机房双A架构,两边可写的场景,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B内部的本地/异地机房的同步需求基本全上了Otter。
Otter基于数据库增量日志解析,支持mysql/oracle数据库进行同步,在最新的v4.2.13已经支持mysql5.7以及阿里云提供的RDS数据库(使用RDS童鞋的福音)

1.2otter工作原理
  • List item
  • List item
  • db : 数据源以及需要同步到的库
  • Canal : 用户获取数据库增量日志
  • manager : 配置同步规则设置数据源同步源等
  • zookeeper : 协调node进行协调工作
  • node : 负责任务处理处理接受到的部分同步工作
1.3otter的特性
  • 使用纯JAVA开发,占时资源比较高
  • 基于Canal获取数据库增量日志,Canal是阿里爸爸另外一个开源产品
  • 使用manager(web管理)+node(工作节点),manager负责配置监控,node负责处理任务
  • 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作
  • 使用aria2多线程传输技术,对网络依赖带宽依赖较低
1.4otter能解决什么问题

1、异构库同步

Otter支持从Mysql同步到Mysql/oracle,我们可以把mysql同步到oracle

2、单机房同步

可以作为一主多从同步方案,对于单机房内网来说效率非常高,还可以做为数据库版本升级,数据表迁移,二级索引等这类功能

3、异地机房同步

异地机房同步可以说是Otter最大的亮点之一,可以解决国际化问题把数据从国内同步到国外来提供用户使用,在国内场景可以做到数据多机房容灾

4、双向同步

双向同步是在数据同步中最难搞的一种场景,Otter可以很好的应对这种场景,Otter有避免回环算法和数据一致性算法两种特性,保证双A机房模式下,数据保证最终一致性

5、文件同步

站点镜像,进行数据复制的同时,复制关联的图片,比如复制产品数据,同时复制产品图片

1.5安装otter

具体安装参考:https://blog.csdn.net/dong_007_007/article/details/78643159

2.Beautiful Soup 的简介

简单来说,Beautiful Soup 是 python 的一个库,最主要的功能是从网页抓取数据。官方解释如下:
Beautiful Soup 提供一些简单的、python 式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具箱,通过解析文档为用户提供需要抓取的数据,因为简单,所以不需要多少代码就可以写出一个完整的应用程序。 Beautiful Soup 自动将输入文档转换为 Unicode 编码,输出文档转换为 utf-8 编码。你不需要考虑编码方式,除非文档没有指定一个编码方式,这时,Beautiful Soup 就不能自动识别编码方式了。然后,你仅仅需要说明一下原始编码方式就可以了。 Beautiful Soup 已成为和 lxml、html6lib 一样出色的 python 解释器,为用户灵活地提供不同的解析策略或强劲的速度。

2.1Beautiful Soup 安装

Beautiful Soup 3 目前已经停止开发,推荐在现在的项目中使用 Beautiful Soup 4,不过它已经被移植到 BS4 了,也就是说导入时我们需要 import bs4 。所以这里我们用的版本是 Beautiful Soup 4.3.2 (简称 BS4),另外据说 BS4 对 Python3 的支持不够好,不过我用的是 Python2.7.7,如果有小伙伴用的是 Python3 版本,可以考虑下载 BS3 版本。 可以利用 pip 或者 easy_install 来安装,以下两种方法均可

pip install beautifulsoup4

Beautiful Soup 支持 Python 标准库中的 HTML 解析器,还支持一些第三方的解析器,如果我们不安装它,则 Python 会使用 Python 默认的解析器,lxml 解析器更加强大,速度更快,推荐安装。

2.2Beautiful Soup 使用

首先必须要导入 bs4 库

from bs4 import BeautifulSoup

我们创建一个字符串,后面的例子我们便会用它来演示

html = """
<html><head><title>The Dormouse's story</title></head>
<body>
<p class="title" name="dromouse"><b>The Dormouse's story</b></p>
<p class="story">Once upon a time there were three little sisters; and their names were
<a href="http://example.com/elsie" class="sister" id="link1"><!-- Elsie --></a>,
<a href="http://example.com/lacie" class="sister" id="link2">Lacie</a> and
<a href="http://example.com/tillie" class="sister" id="link3">Tillie</a>;
and they lived at the bottom of a well.</p>
<p class="story">...</p>
"""

创建 beautifulsoup 对象

soup = BeautifulSoup(html)

另外,我们还可以用本地 HTML 文件来创建对象,例如

soup = BeautifulSoup(open('index.html'))

上面这句代码便是将本地 index.html 文件打开,用它来创建 soup 对象 下面我们来打印一下 soup 对象的内容,格式化输出

print soup.prettify()
<html>
 <head>
  <title>
   The Dormouse's story
  </title>
 </head>
 <body>
  <p class="title" name="dromouse">
   <b>
    The Dormouse's story
   </b>
  </p>
  <p class="story">
   Once upon a time there were three little sisters; and their names were
   <a class="sister" href="http://example.com/elsie" id="link1">
    <!-- Elsie -->
   </a>
   ,
   <a class="sister" href="http://example.com/lacie" id="link2">
    Lacie
   </a>
   and
   <a class="sister" href="http://example.com/tillie" id="link3">
    Tillie
   </a>
   ;
and they lived at the bottom of a well.
  </p>
  <p class="story">
   ...
  </p>
 </body>
</html>

以上便是输出结果,格式化打印出了它的内容,这个函数经常用到,小伙伴们要记好咯。

四大对象种类

Beautiful Soup 将复杂 HTML 文档转换成一个复杂的树形结构,每个节点都是 Python 对象,所有对象可以归纳为 4 种:

  • Tag
  • NavigableString
  • BeautifulSoup
  • Comment

(1)Tag
Tag 是什么?通俗点讲就是 HTML 中的一个个标签,例如

<title>The Dormouse's story</title>
<a class="sister" href="http://example.com/elsie" id="link1">Elsie</a>

上面的 title a 等等 HTML 标签加上里面包括的内容就是 Tag,下面我们来感受一下怎样用 Beautiful Soup 来方便地获取 Tags 下面每一段代码中注释部分即为运行结果

print soup.title
#<title>The Dormouse's story</title>
print soup.head
#<head><title>The Dormouse's story</title></head>
print soup.a
#<a class="sister" href="http://example.com/elsie" id="link1"><!-- Elsie --></a>
print soup.p
#<p class="title" name="dromouse"><b>The Dormouse's story</b></p>

我们可以利用 soup 加标签名轻松地获取这些标签的内容,是不是感觉比正则表达式方便多了?不过有一点是,它查找的是在所有内容中的第一个符合要求的标签,如果要查询所有的标签,我们在后面进行介绍。 我们可以验证一下这些对象的类型

print type(soup.a)
#<class 'bs4.element.Tag'>

对于 Tag,它有两个重要的属性,是 name 和 attrs,下面我们分别来感受一下 name

print soup.name
print soup.head.name
#[document]
#head

soup 对象本身比较特殊,它的 name 即为 [document],对于其他内部标签,输出的值便为标签本身的名称。 attrs

print soup.p.attrs
#{'class': ['title'], 'name': 'dromouse'}

在这里,我们把 p 标签的所有属性打印输出了出来,得到的类型是一个字典。 如果我们想要单独获取某个属性,可以这样,例如我们获取它的 class 叫什么

print soup.p['class']
#['title']

还可以这样,利用 get 方法,传入属性的名称,二者是等价的

print soup.p.get('class')
#['title']

我们可以对这些属性和内容等等进行修改,例如

soup.p['class']="newClass"
print soup.p
#<p class="newClass" name="dromouse"><b>The Dormouse's story</b></p>

还可以对这个属性进行删除,例如

del soup.p['class']
print soup.p
#<p name="dromouse"><b>The Dormouse's story</b></p>

不过,对于修改删除的操作,不是我们的主要用途,在此不做详细介绍了,如果有需要,请查看前面提供的官方文档

(2)NavigableString
既然我们已经得到了标签的内容,那么问题来了,我们要想获取标签内部的文字怎么办呢?很简单,用 .string 即可,例如

print soup.p.string
#The Dormouse's story

这样我们就轻松获取到了标签里面的内容,想想如果用正则表达式要多麻烦。它的类型是一个 NavigableString,翻译过来叫 可以遍历的字符串,不过我们最好还是称它英文名字吧。 来检查一下它的类型

print type(soup.p.string)
#<class 'bs4.element.NavigableString'>

(3)BeautifulSoup
BeautifulSoup 对象表示的是一个文档的全部内容。大部分时候,可以把它当作 Tag 对象,是一个特殊的 Tag,我们可以分别获取它的类型,名称,以及属性来感受一下

print type(soup.name)
#<type 'unicode'>
print soup.name 
# [document]
print soup.attrs 
#{} 空字典

(4)Comment
Comment 对象是一个特殊类型的 NavigableString 对象,其实输出的内容仍然不包括注释符号,但是如果不好好处理它,可能会对我们的文本处理造成意想不到的麻烦。 我们找一个带注释的标签

print soup.a
print soup.a.string
print type(soup.a.string)

运行结果如下

<a class="sister" href="http://example.com/elsie" id="link1"><!-- Elsie --></a>
 Elsie 
<class 'bs4.element.Comment'>

a 标签里的内容实际上是注释,但是如果我们利用 .string 来输出它的内容,我们发现它已经把注释符号去掉了,所以这可能会给我们带来不必要的麻烦。 另外我们打印输出下它的类型,发现它是一个 Comment 类型,所以,我们在使用前最好做一下判断,判断代码如下

if type(soup.a.string)==bs4.element.Comment:
    print soup.a.string

上面的代码中,我们首先判断了它的类型,是否为 Comment 类型,然后再进行其他操作,如打印输出。

上面是一些常用的简单操作,如果需要详情,请参考:https://cuiqingcai.com/1319.html

argparse介绍

argparse 是python自带的命令行参数解析包,可以用来方便地读取命令行参数,当你的代码需要频繁地修改参数的时候,使用这个工具可以将参数和代码分离开来,让你的代码更简洁,适用范围更广。
argparse使用比较简单,常用的功能可能较快地实现出来,下面我分几个步骤,以Python3为例,逐渐递增地讲述argparse的用法。

1.基本框架
下面是使用argparse从命令行获取用户名,然后打印’Hello ‘+ 用户名,假设python文件名为print_name.py:

# file-name:print_name.py
import argparse

def get_parser():
    parser = argparse.ArgumentParser(description="Demo of argparse")
    parser.add_argument('--name', default='Great')
    
    return parser


if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    name = args.name
    print('Hello {}'.format(name))

在命令行执行如下命令:

$ python print_name.py --name Wang
Hello Wang

上面的代码段中,我们显示引入了argparse包,然后通过argparse.ArgumentParser函数生成argparse对象,其中这个函数的description函数表示在命令行显示帮助信息的时候,这个程序的描述信息。之后我们通过对象的add_argument函数来增加参数。这里我们只增加了一个–name的参数,然后后面的default参数表示如果没提供参数,我们默认采用的值。即如果像下面这样执行命令:

$ python print_name.py 

则输出是:

$ Hello Great

最后我们通过argpaser对象的parser_args函数来获取所有参数args,然后通过args.name的方式得到我们设置的–name参数的值,可以看到这里argparse默认的参数名就是–name形式里面–后面的字符串。
整个流程就是这样,下面我们详细讲解add_argument函数的一些最常用的参数,使得你看完这个教程之后,能完成科研和工作中的大部分命令解析任务。

2. default:没有设置值情况下的默认参数
如同上例中展示的,default表示命令行没有设置该参数的时候,程序中用什么值来代替。

3. required: 表示这个参数是否一定需要设置
如果设置了required=True,则在实际运行的时候不设置该参数将报错:

parser.add_argument('-name', required=True)

则运行下面的命令会报错:

$ python print_name.py
usage: print_name.py [-h] --name NAME
print_name.py: error: argument --name is required

4. type:参数类型
默认的参数类型是str类型,如果你的程序需要一个整数或者布尔型参数,你需要设置type=int或type=bool,下面是一个打印平方的例子:

#name: square.py
import argparse

def get_parser():
    parser = argparse.ArgumentParser(
        description='Calculate square of a given number')
    parser.add_argument('-number', type=int)

    return parser


if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    res = args.number ** 2
    print('square of {} is {}'.format(args.number, res))

执行:

$ python square.py -number 5
square of 5 is 25 

5. choices:参数值只能从几个选项里面选择
如下面的代码:

# file-name: choices.py
import argparse

def get_parser():
    parser = argparse.ArgumentParser(
        description='choices demo')
    parser.add_argument('-arch', required=True, choices=['alexnet', 'vgg'])

    return parser

if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    print('the arch of CNN is '.format(args.arch))

如果像下面这样执行会报错:

$ python choices.py -arch resnet
usage: choices.py [-h] -arch {alexnet,vgg}
choices.py: error: argument -arch: invalid choice: 'resnet' (choose from 'alexnet', 'vgg')

因为我们所给的-arch参数resnet不在备选的choices之中,所以会报错
6. help:指定参数的说明信息
在现实帮助信息的时候,help参数的值可以给使用工具的人提供该参数是用来设置什么的说明,对于大型的项目,help参数和很有必要的,不然使用者不太明白每个参数的含义,增大了使用难度。
下面是个例子:

# file-name: help.py
import argparse

def get_parser():
    parser = argparse.ArgumentParser(
        description='help demo')
    parser.add_argument('-arch', required=True, choices=['alexnet', 'vgg'],
        help='the architecture of CNN, at this time we only support alexnet and vgg.')

    return parser


if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    print('the arch of CNN is '.format(args.arch))

在命令行加-h或–help参数运行该命令,获取帮助信息的时候,结果如下:

$ python help.py -h
usage: help.py [-h] -arch {alexnet,vgg}

choices demo

optional arguments:
  -h, --help           show this help message and exit
  -arch {alexnet,vgg}  the architecture of CNN, at this time we only support
                       alexnet and vgg.

7. dest:设置参数在代码中的变量名
argparse默认的变量名是–或-后面的字符串,但是你也可以通过dest=xxx来设置参数的变量名,然后在代码中用args.xxx来获取参数的值。
8. nargs: 设置参数在使用可以提供的个数
使用方式如下:

parser.add_argument('-name', nargs=x)

其中x的候选值和含义如下:

值  含义
N   参数的绝对个数(例如:3'?'   01个参数
'*'   0或所有参数
'+'   所有,并且至少一个参数

如下例子:

# file-name: nargs.py
import argparse

def get_parser():
    parser = argparse.ArgumentParser(
        description='nargs demo')
    parser.add_argument('-name', required=True, nargs='+')

    return parser


if __name__ == '__main__':
    parser = get_parser()
    args = parser.parse_args()
    names = ', '.join(args.name)
    print('Hello to {}'.format(names))

执行命令和结果如下:

$ python nargs.py -name A B C
Hello to A, B, C

详情可参考:
http://blog.xiayf.cn/2013/03/30/argparse/
https://docs.python.org/3/library/argparse.html

最后笔者把这几个功能实现的otter同步代码给大家展示一下,希望对每个人都有帮助

#otter.yaml
ORIGIN_USER: root
ORIGIN_PASSWORD: 123456
ORIGIN_PORT: 3306
TARGET_USER: root
TARGET_PASSWORD: 123456
TARGET_PORT: 3306
ORIGIN_DATABASE: keystone
TARGET_DATABASE: keystone
ENDPOINT: http://10.121.118.26:30113
# encoding=utf-8
#!/usr/bin/env python
#sync-otter.py

import argparse
import logging
import re
import requests
import yaml
from bs4 import BeautifulSoup

logging.basicConfig(format='%(message)s', level=logging.INFO)


class otter_sync():
    def __init__(self):
        self._csrf_token = None
        self.headers = {
            'Cookie': None,
        }
        self.origin_id = None
        self.target_id = None
        self.data_origin_table_id = None
        self.data_target_table_id = None
        self.canal_id = None
        self.channel_id = None
        self.pipeline_id = None
        self.ip_canal = None
        self.name = None
        self.origin_name = None
        self.target_name = None
        self.canal_name = None
        self.channel_name = None
        self.pipeline_name = None
        self.origin_user = None
        self.origin_password = None
        self.origin_url = None
        self.target_user = None
        self.target_password = None
        self.target_url = None
        self.origin_database = None
        self.target_database = None
        self.endpoint = None
        self.html_table_unify = []
        self.otter_tables = {}

    def otter_sync(self):
        self.disponse_otter_yaml()
        self.dispose_name()
        # self.add_zookper(IP_ZOOKEEPER='10.121.121.241:3306;')
        # self.add_node(IP_NODE='127.0.0.1', PORT_NODE=2089)

        self.otter_login(url=self.endpoint + '/login.htm')
        self.otter_csrf(url=self.endpoint + '/addDataSource.htm')
        self.add_data_origin(name=self.name + '_origin', mysql_user=self.origin_user,
                             mysql_password=self.origin_password,
                             data_url=self.origin_url,
                             url=self.endpoint + '/addDataSource.htm')

        self.dispose_id(name=self.name + '_origin', url=self.endpoint + '/data_source_list.htm',
                        is_data_origin=True, data_source=True)

        self.add_data_origin(name=self.name + '_target', mysql_user=self.target_user,
                             mysql_password=self.target_password,
                             data_url=self.target_url,
                             url=self.endpoint + '/addDataSource.htm')

        self.dispose_id(name=self.name + '_target', url=self.endpoint + '/data_source_list.htm',
                        is_data_target=True, data_source=True)

        self.add_data_table(origin_database=self.origin_database, source_name=self.origin_name,
                            origin_id=self.origin_id, url=self.endpoint + '/addDataMedia.htm')

        self.dispose_id(origin_database_name=self.origin_database, origin_name=self.origin_name,
                        url=self.endpoint + '/data_media_list.htm',
                        is_data_origin_table=True, is_data_table=True)

        self.add_data_table(origin_database=self.target_database, source_name=self.target_name,
                            origin_id=self.target_id, url=self.endpoint + '/addDataMedia.htm')

        self.dispose_id(target_database_name=self.target_database, target_name=self.target_name,
                        url=self.endpoint + '/data_media_list.htm',
                        is_data_target_table=True, is_data_table=True)

        self.add_canal(name=self.name + '_canal', ip_canal=self.ip_canal, mysql_user=self.origin_user,
                       mysql_password=self.origin_password,
                       url=self.endpoint + '/add_canal.htm')

        self.add_channel(name=self.name + '_channel', url=self.endpoint + '/add_channel.htm')

        self.dispose_id(url=self.endpoint + '/channel_list.htm', is_channel=True)

        self.add_pipeline(name=self.name + '_pipeline',
                          url=self.endpoint + '/add_pipeline.htm?channelId=' + self.channel_id)

        self.dispose_id(url=self.endpoint + '/pipelineList.htm?channelId=' + self.channel_id,
                        is_pipeline=True)

        self.add_data_media_pair(
            url=self.endpoint + '/add_data_media_pair.htm?pipelineId=' + self.pipeline_id)

        self.start_sync_task(
            url=self.endpoint + '/?action=channelAction&channelId=' + self.channel_id + '&status=start&pageIndex=1&searchKey=&eventSubmitDoStatus=true')

    def get_parser(self):
        parser = argparse.ArgumentParser(description="Gets parameters for otter synchronization")
        parser.add_argument('--ORIGIN_USER', type=str, default='root',
                            help='参数表示源数据库的用户名,默认为root.')
        parser.add_argument('--ORIGIN_PASSWORD', type=int,
                            help='参数表示源数据库的密码.')
        parser.add_argument('--ORIGIN_IP', type=str,
                            help='参数表示源数据库IP,如:127.0.0.1.')
        parser.add_argument('--ORIGIN_PORT', type=str,
                            help='参数表示源数据库PORT,如:3306.')
        parser.add_argument('--TARGET_USER', type=str, default='root',
                            help='参数表示源数据库的用户名,默认为root.')
        parser.add_argument('--TARGET_PASSWORD', type=int,
                            help='参数表示目标数据库的密码.')
        parser.add_argument('--TARGET_IP', type=str,
                            help='参数表示目标数据库IP,如:127.0.0.1.')
        parser.add_argument('--TARGET_PORT', type=str,
                            help='参数表示目标数据库PORT,如:3306.')
        parser.add_argument('--ORIGIN_DATABASE', type=str, default='keystone',
                            help='参数表示源数据库的名称,默认为keystone.')
        parser.add_argument('--TARGET_DATABASE', type=str, default='keystone',
                            help='参数表示目标数据库的名称,默认为keystone.')
        parser.add_argument('--ENDPOINT', type=str,
                            help='该参数表示终端的IP地址,例如http://127.0.0.1:30113.')

        return parser

    def otter_login(self, url=None):
        data = {
            '_fm.l._0.n': 'admin',
            '_fm.l._0.p': 'admin',
            'action': 'user_action',
            'event_submit_do_login': 1
        }
        #
        resp = requests.post(url=url, data=data)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        if not resp.history[0].headers['Set-Cookie']:
            raise Exception('The token is not obtained. Procedure.')
        JSESSIONID = resp.history[0].headers['Set-Cookie'].split(';')
        OTTER_WEBX_JSESSIONID0 = JSESSIONID[2].split(',')
        Set_Cookie = JSESSIONID[0] + ';' + OTTER_WEBX_JSESSIONID0[1]
        self.headers['Cookie'] = Set_Cookie
        if self.headers.get('Cookie') == None:
            raise Exception('No Cookie was obtained.')

    def otter_csrf(self, url=None):
        resp = requests.get(url=url, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        html = resp.text
        csrf_token = BeautifulSoup(html)
        csrf_token.input['self.name'] = '_csrf_token'
        res = csrf_token.input
        self._csrf_token = res['value']
        if self._csrf_token == None:
            raise Exception('The CSRF token is not obtained. Procedure.')

    def disponse_otter_yaml(self):
        with open('otter.yaml', 'r') as f:
            otter_yaml = f.read()
            configuration_data = yaml.load(otter_yaml)
            regex = re.compile(
                r'^(?:http|ftp)s?://'  # http:// or https:// or jdc:mysql://
                r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
                r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
                r'(?::\d+)?'  # optional port
                r'(?:/?|[/?]\S+)$', re.IGNORECASE)

            regex_ip = re.compile(
                r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}$', re.IGNORECASE)  # ...or ip

            if 'ORIGIN_USER' not in configuration_data or 'ORIGIN_PASSWORD' not in configuration_data \
                    or 'ORIGIN_PORT' not in configuration_data or 'TARGET_USER' not in configuration_data or \
                    'TARGET_PASSWORD' not in configuration_data or 'TARGET_PORT' not in configuration_data or \
                    'ORIGIN_DATABASE' not in configuration_data or 'TARGET_DATABASE' not in configuration_data \
                    or 'ENDPOINT' not in configuration_data:
                raise Exception('''The configuration item variable is changed. Please refer to the template to fill in:
                { 
                    "ORIGIN_USER":"root", 
                    "ORIGIN_PASSWORD":123456, 
                    "ORIGIN_PORT":3306,
                    "TARGET_USER":"root", 
                    "TARGET_PASSWORD":123456, 
                    "TARGET_PORT":3306, 
                    "ORIGIN_DATABASE":"default", 
                    "TARGET_DATABASE":"default", 
                    "ENDPOINT":"http://127.0.0.1:30113" 
                    }''')
            parser = self.get_parser()
            args = parser.parse_args()
            if args.ORIGIN_USER:
                self.origin_user = args.ORIGIN_USER
            elif configuration_data['ORIGIN_USER'] != None:
                self.origin_user = configuration_data['ORIGIN_USER']
            else:
                raise Exception(
                    'The ORIGIN_USER parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.ORIGIN_PASSWORD:
                self.origin_password = args.ORIGIN_PASSWORD
            elif configuration_data['ORIGIN_PASSWORD'] != None:
                self.origin_password = configuration_data['ORIGIN_PASSWORD']
            else:
                raise Exception(
                    'The ORIGIN_PASSWORD parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.ORIGIN_IP and not regex_ip.match(args.ORIGIN_IP):
                raise Exception(
                    'The ORIGIN IP parameter format is incorrect, for example:127.0.0.1')
            elif args.ORIGIN_IP and regex_ip.match(args.ORIGIN_IP) and args.ORIGIN_PORT:
                self.origin_url = 'jdbc:mysql://' + args.ORIGIN_IP + ':' + args.ORIGIN_PORT
                self.ip_canal = args.ORIGIN_IP + ':' + args.ORIGIN_PORT + ';'
            elif args.ORIGIN_IP and regex_ip.match(args.ORIGIN_IP) and configuration_data['ORIGIN_PORT'] != None:
                self.origin_url = 'jdbc:mysql://' + args.ORIGIN_IP + ':' + str(configuration_data['ORIGIN_PORT'])
                self.ip_canal = args.ORIGIN_IP + ':' + str(configuration_data['ORIGIN_PORT']) + ';'
            else:
                raise Exception(
                    'The ORIGIN PORT parameter was not found or the ORIGIN IP format is incorrect, for example, 127.0.0.1')

            if args.TARGET_USER:
                self.target_user = args.TARGET_USER
            elif configuration_data['TARGET_USER'] != None:
                self.target_user = configuration_data['TARGET_USER']
            else:
                raise Exception(
                    'The TARGET_USER parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.TARGET_PASSWORD:
                self.target_password = args.TARGET_PASSWORD
            elif configuration_data['TARGET_PASSWORD'] != None:
                self.target_password = configuration_data['TARGET_PASSWORD']
            else:
                raise Exception(
                    'The TARGET_PASSWORD parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.TARGET_IP and not regex_ip.match(args.TARGET_IP):
                raise Exception(
                    'The format of the TARGET IP parameter is incorrect, for example:127.0.0.1')
            elif args.TARGET_IP and regex_ip.match(args.TARGET_IP) and args.TARGET_PORT:
                self.target_url = 'jdbc:mysql://' + args.TARGET_IP + ':' + args.TARGET_PORT
            elif args.TARGET_IP and regex_ip.match(args.TARGET_IP) and configuration_data['TARGET_PORT'] != None:
                self.target_url = 'jdbc:mysql://' + args.TARGET_IP + ':' + str(configuration_data['TARGET_PORT'])
            else:
                raise Exception(
                    'The TARGET PORT parameter was not found or the TARGET IP format is incorrect, such as 127.0.0.1')

            if args.ORIGIN_DATABASE:
                self.origin_database = args.ORIGIN_DATABASE
            elif configuration_data['ORIGIN_DATABASE'] != None:
                self.origin_database = configuration_data['ORIGIN_DATABASE']
            else:
                raise Exception(
                    'The ORIGIN_DATABASE parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.TARGET_DATABASE:
                self.target_database = args.TARGET_DATABASE
            elif configuration_data['TARGET_DATABASE'] != None:
                self.target_database = configuration_data['TARGET_DATABASE']
            else:
                raise Exception(
                    'The TARGET_DATABASE parameter could not be found. Check whether this parameter is configured in the runtime or configuration file')

            if args.ENDPOINT and not regex.match(args.ENDPOINT):
                raise Exception(
                    'The format of the ENDPOINT parameter is incorrect, for example:http://127.0.0.1:8081')
            if args.ENDPOINT and regex.match(args.ENDPOINT):
                self.endpoint = args.ENDPOINT
            elif configuration_data['ENDPOINT'] != None and regex.match(
                    configuration_data['ENDPOINT']):
                self.endpoint = configuration_data['ENDPOINT']
            else:
                raise Exception(
                    'No ENDPOINT parameter found or format error, for example:http://127.0.0.1:8081')

    def dispose_name(self):
        master_url = self.origin_url.split(':')
        slave_url = self.target_url.split(':')
        origin_url_name = master_url[2]
        target_url_name = slave_url[2]
        self.name = origin_url_name.split('.')[3] + '_' + target_url_name.split('.')[3]

    def dispose_id(self, name=None, url=None, origin_database_name=None, origin_name=None,
                   target_database_name=None, target_name=None, is_data_origin=False, data_source=False,
                   is_data_table=False, is_canal=False, is_channel=False, is_pipeline=False, is_data_target=False,
                   is_data_origin_table=False, is_data_target_table=False):
        res = requests.get(url=url, headers=self.headers)
        html = res.text
        soup = BeautifulSoup(html)
        datas_table_origin = soup.table
        datas_table = []
        for data in datas_table_origin.stripped_strings:
            datas_table.append(data)
        if data_source or is_canal:
            self.html_table_unify = [i for i in datas_table[6:] if
                                     (i != '查看') and (i != '|') and (i != '编辑') and (i != '删除')]
            self.otter_tables = {
                'dispose_datas': [self.html_table_unify[i:i + 5] for i in
                                  range(0, len(self.html_table_unify), 5)],
                'data_key': ['id', 'name', 'database', 'coding', 'url']}

        elif is_data_table:
            self.html_table_unify = [i for i in datas_table[6:] if
                                     (i != '查看') and (i != '|') and (i != '编辑') and (i != '删除')]
            self.otter_tables = {
                'dispose_datas': [self.html_table_unify[i:i + 5] for i in
                                  range(0, len(self.html_table_unify), 5)],
                'data_key': ['id', 'name', 'table', 'source_name', 'coding']}

        elif is_channel:
            self.html_table_unify = [i for i in datas_table[5:] if
                                     (i != '查看') and (i != '|') and (i != '编辑') and (i != '删除') and (i != '停止') and (
                                             i != '无') and (
                                             i != '启用') and (
                                             i != '推送') and (i != '单向')]
            self.otter_tables = {
                'dispose_datas': [self.html_table_unify[i:i + 2] for i in range(0, len(self.html_table_unify), 2)],
                'data_key': ['id', 'name']}

        elif is_pipeline:
            self.html_table_unify = [i for i in datas_table[10:] if
                                     (i != '查看') and (i != '|') and (i != '编辑') and (i != '删除') and (i != '监控') and (
                                             i != '日志')]
            self.otter_tables = {
                'dispose_datas': [self.html_table_unify[i:i + 7] for i in range(0, len(self.html_table_unify), 7)],
                'data_key': ['id', 'name', 'parallel_data', 'home_site', 'mainstem_state', 'delay_time',
                             'monitoring_num']}

        data_key = self.otter_tables.get('data_key', '')
        data_origin = []
        for data_value in self.otter_tables.get('dispose_datas'):
            data_origin.append(dict(zip(data_key, data_value)))

        for data in data_origin:
            if name == data.get('name', None) and is_data_origin:
                self.origin_id = data.get('id', None)
                self.origin_name = name
                if self.origin_id == None:
                    raise Exception('The data source ID was not obtained. Procedure')
            elif name == data.get('name', None) and is_data_target:
                self.target_id = data.get('id', None)
                self.target_name = name
                if self.target_id == None:
                    raise Exception('The target ID was not obtained.')
            elif origin_database_name == data.get('name', None) and origin_name == data.get('source_name',
                                                                                            None) and is_data_origin_table:
                self.data_origin_table_id = data.get('id', None)
                if self.data_origin_table_id == None:
                    raise Exception('The DATA Origin table ID was not found.')
            elif target_database_name == data.get('name', None) and target_name == data.get('source_name',
                                                                                            None) and is_data_target_table:
                self.data_target_table_id = data.get('id', None)
                if self.data_target_table_id == None:
                    raise Exception('The DATA Target table ID was not found.')
            elif data.get('name', None) == self.canal_name and is_canal:
                self.canal_id = data.get('id', None)
                if self.canal_id == None:
                    raise Exception('The Canal ID was not obtained.')
            elif self.channel_name == data.get('name', None) and is_channel:
                self.channel_id = data.get('id', None)
                if self.channel_id == None:
                    raise Exception('The Channel ID was not obtained')
            elif self.pipeline_name == data.get('name', None) and is_pipeline:
                self.pipeline_id = data.get('id', None)
                if self.pipeline_id == None:
                    raise Exception('Pipeline ID not obtained')

    # def add_zookper(self, IP_ZOOKEEPER=None):
    #     url = 'http://10.121.118.26:30113/add_zookeeper.htm'
    #     data = {
    #         '_csrf_token': self._csrf_token,
    #         'action': 'auto_keeper_cluster_action',
    #         'event_submit_do_add': 1,
    #         '_fm.a._0.c': 'kevin222',
    #         '_fm.a._0.z': IP_ZOOKEEPER,
    #         '_fm.a._0.d': 'add zookeeper.',
    #
    #     }
    #     resp = requests.post(url=url, data=data, headers=self.headers)
    #     if not resp.status_code == 200:
    #         # raise Exception('An internal error occurs on the server when Zookeeper is added')
    #         raise Exception(resp.text)
    #
    # def add_node(self, IP_NODE=None, PORT_NODE=None):
    #     url = 'http://10.121.118.26:30113/add_node.htm'
    #     data = {
    #         '_csrf_token': self._csrf_token,
    #         'action': 'node_action',
    #         'event_submit_do_add': 1,
    #         '_fm.n._0.n': 'kevin',
    #         '_fm.n._0.ip': IP_NODE,
    #         '_fm.n._0.p': PORT_NODE,
    #         '_fm.no._0.u': False,
    #         '_fm.no._0.a': 1,
    #         '_fm.n._0.d': 'add node.'
    #
    #     }
    #     resp = requests.post(url=url, data=data, headers=self.headers)
    #     if not resp.status_code == 200:
    #         raise Exception(resp.text)

    def add_data_origin(self, name=None, mysql_user=None, mysql_password=None, data_url=None, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'data_media_source_action',
            'event_submit_do_add': 1,
            '_fm.d._0.n': name,
            '_fm.d._0.t': 'MYSQL',
            '_fm.d._0.u': mysql_user,
            '_fm.d._0.p': mysql_password,
            '_fm.d._0.ur': data_url,
            '_fm.d._0.e': 'UTF8'
        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)

    def add_data_table(self, origin_database=None, source_name=None, origin_id=None, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'data_media_action',
            'event_submit_do_add': 1,
            '_fm.da._0.na': origin_database,
            '_fm.da._0.n': '(.*).(.*)',
            '_fm.da._0.s': source_name,
            '_fm.da._0.so': origin_id
        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)

    def add_canal(self, name=None, ip_canal=None, mysql_user=None, mysql_password=None, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'canal_action',
            'event_submit_do_add': 1,
            '_fm.ca._0.n': name,
            '_fm.can._0.r': 'EMBEDDED',
            '_fm.can._0.a': 1,
            '_fm.can._0.s': 'MYSQL',
            '_fm.can._0.g': ip_canal,
            '_fm.can._0.d': mysql_user,
            '_fm.can._0.db': mysql_password,
            '_fm.can._0.co': 'UTF-8',
            '_fm.can._0.gt': False,
            '_fm.can._0.t': False,
            '_fm.can._0.st': 'MEMORY',
            '_fm.can._0.sto': 'MEMSIZE',
            '_fm.can._0.me': 32768,
            '_fm.can._0.mem': 1024,
            '_fm.can._0.h': 'HEARTBEAT',
            '_fm.can._0.det': False,
            '_fm.can._0.dete': 'insert into retl.xdual values(1,now()) on duplicate key update x=now()',
            '_fm.can._0.detec': 5,
            '_fm.can._0.detect': 30,
            '_fm.can._0.detecti': 3,
            '_fm.can._0.he': False,
            '_fm.can._0.m': 'MIXED',
            '_fm.can._0.in': 'MEMORY_META_FAILBACK',
            '_fm.can._0.po': 11111,
            '_fm.can._0.de': 30,
            '_fm.can._0.re': 16384,
            '_fm.can._0.se': 16384,
            '_fm.can._0.fa': 60,
        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        self.canal_name = name

    def add_channel(self, name=None, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'channel_action',
            'event_submit_do_add': 1,
            '_fm.ch._0.n': name,
            '_fm.cha._0.sy': 'BASE',
            '_fm.cha._0.s': 'ROW',
            '_fm.cha._0.e': True,
            '_fm.cha._0.re': 'LOOPBACK',
            '_fm.cha._0.r': 60,
            '_fm.ch._0.d': 'add channel.'
        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        self.channel_name = name

    def add_pipeline(self, name=None, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'pipeline_action',
            'event_submit_do_add': 1,
            '_fm.p._0.i': '$pipeline.id',
            '_fm.p._0.c': self.channel_id,
            'channelId': self.channel_id,
            '_fm.p._0.n': name,
            '_fm.p._0.s': 1,
            '_fm.p._0.l': 1,
            '_fm.pi._0.pa': 5,
            '_fm.pi._0.e': 10,
            '_fm.pi._0.l': 15,
            '_fm.pi._0.f': 15,
            '_fm.pi._0.h': False,
            '_fm.pi._0.se': 'Canal',
            '_fm.pi._0.de': self.canal_name,
            '_fm.pi._0.m': 6000,
            '_fm.pi._0.b': -1,
            '_fm.p._0.d': 'add pipeline.'
        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        self.pipeline_name = name

    def add_data_media_pair(self, url=None):
        data = {
            '_csrf_token': self._csrf_token,
            'action': 'data_media_pair_action',
            'event_submit_do_add': 1,
            '_fm.dat._0.pi': self.pipeline_id,
            'pipelineId': self.pipeline_id,
            'channelId': self.channel_id,
            '_fm.dat._0.s': '(.*).(.*)',
            '_fm.dat._0.so': self.data_origin_table_id,
            '_fm.dat._0.t': '(.*).(.*)',
            '_fm.dat._0.ta': self.data_target_table_id,
            '_fm.dat._0.pu': 5,
            '_fm.dat._0.c': 'INCLUDE',
            '_fm.dat._0.fi': 'CLAZZ',
            '_fm.dat._0.r': 'CLAZZ',
            'submitKey': '保存'

        }
        resp = requests.post(url=url, data=data, headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)

    def start_sync_task(self, url=None):
        resp = requests.get(url=url,
                            headers=self.headers)
        if not resp.status_code == 200:
            raise Exception(resp.text)
        logging.info(">>>>>>>>>>>>脚本运行成功")


if __name__ == '__main__':
    task = otter_sync()
    task.otter_sync()
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐