修改_Program类

本章主要关于如何把第二章新增的函数加入算法中


1 引入新属性

  • ts_function_set

    时间序列函数集

  • fixed_function_set

    TA-Lib固定参数函数集

  • d_ls

    时间序列函数参数 ‘d’ 可选范围


2 build_program(self, random_state)

这里要简单讲一下,首先解释一下原代码的意思,大家对着代码看,我就不copy过来了。

首先,在随机选定init_method后,我们要生成树的根结点,也就是随机从函数集中抽一个函数。我们把它和它的参数个数(arity)作为第一个对象创建program和terminal_stack这两个列表,这个terminal_stack使用来保证生成的这个树每一层的结点数要和参数个数对得上,不然最后没办法excute。

接下来就是核心部分,我们设计一个循环,只要terminal_stack非空我们就继续加入新的点。在选择点的时候,首先生成一个自然数,小于函数个数与变量个数的和,如果目前的深度没到最大深度,且抽到的随机数比函数个数小或init_method是’full’,那就意味着这个点需要一个函数,于是我们就抽一个放到program的最后,并且在terminal_stack的最后也添加一个此函数的arity。

另一种情况则是这个点需要一个变量或常数,如果提供了常数区间这一属性,那么就随机生成一个小于(变量个数+1)的自然数,然后如果抽到的数等于变量个数则再从常数区间中抽一个常数(float类型)添加到program的最后,否则把这个抽到的数(整型)添加到program的最后;那如果常数区间这一属性为None,则直接抽一个小于变量个数的自然数作为对应变量添加到program的最后。经过以上步骤我们可以发现,program这个列表中有三类:函数、整型(代表对应变量),浮点数(代表常数)。

最后,我们要把terminal_stack的最后一个元素减去1,这是因为这一步中我们确定的是上一层函数所需的参数,那么确定一个参数就少一个待确定的参数。接下来如果减为0了,那意味着这一层函数下面的分支已经全部确定完毕了,于是就要把这个0移除,下一轮循环中抽取最近的有待确定参数的上层函数的参数。最终所有参数都确定完的时候,terminal_stack应该为空,这时候就直接返回program就结束了。


我们需要进行一些调整,新增的时间序列函数与默认函数的区别只在于多了一个参数d,而固定参数函数除了需要参数d外,相当于一个叶子点(参数全部固定,无需抽取),于是我们要分别对待。修改的地方有以下几个:

在第一次生成随机数时,要在默认函数个数与变量个数的和的基础上加上新增的两类函数的个数,判断是否为非固定参数的函数时的界限也要从默认函数个数改为其加上时间序列函数个数;

如果判断为是,则再抽一轮决定是否为默认函数,如果小于默认函数个数则为对应序号的默认函数,否则意味着抽到了第(随机数-默认函数个数)个时间序列函数,还需要随机生成一个参数d,这里封装一下:

def make_ts_function(function, d_ls, random_state):
    """
    Parameters
    ----------
    function: _Function
        时间序列函数.

    d_ls: list
        参数 'd' 可选范围.

    random_state: RandomState instance
        随机数生成器.

    """
    d = random_state.randint(len(d_ls))
    d = d_ls[d]
    function_ = copy(function)
    function_.set_d(d)
    return function_

如果判断为否,则同上我们要在变量、常数之外多加一个固定参数函数的选项,逻辑一样具体就不赘述了。


最后贴一下修改后的代码:

def build_program(self, random_state):
    """Build a naive random program.

    Parameters
    ----------
    random_state : RandomState instance
        The random number generator.

    Returns
    -------
    program : list
        The flattened tree representation of the program.

    """
    if self.init_method == 'half and half':
        method = ('full' if random_state.randint(2) else 'grow')
    else:
        method = self.init_method
    max_depth = random_state.randint(*self.init_depth)

    # Start a program with a function to avoid degenerative programs

    # function = random_state.randint(len(self.function_set))
    function = random_state.randint(len(self.function_set) + len(self.ts_function_set))
    # function = self.function_set[function]
    if function < len(self.function_set):
        function = self.function_set[function]
    else:
        function = make_ts_function(self.ts_function_set[function - len(self.function_set)],
                                    self.d_ls, random_state)

    program = [function]
    terminal_stack = [function.arity]

    while terminal_stack:
        depth = len(terminal_stack)

        # choice = self.n_features + len(self.function_set)
        choice = self.n_features + len(self.function_set) + len(self.ts_function_set) + \
                 len(self.fixed_function_set)

        choice = random_state.randint(choice)

        # Determine if we are adding a function or terminal
        # if (depth < max_depth) and (method == 'full' or
        #                             choice <= len(self.function_set)):
        if (depth < max_depth) and (method == 'full' or
                                    choice < len(self.function_set) + len(self.ts_function_set)):
            function = random_state.randint(len(self.function_set))
            # function = self.function_set[function]
            if choice < len(self.function_set):
                function = self.function_set[function]
            else:
                function = make_ts_function(self.ts_function_set[function - len(self.function_set)],
                                            self.d_ls, random_state)
            program.append(function)
            terminal_stack.append(function.arity)
        else:
            # We need a terminal, add a variable or a fixed function or constant
            if self.const_range is not None:
                # terminal = random_state.randint(self.n_features + 1)
                terminal = random_state.randint(self.n_features + len(self.fixed_function_set) + 1)
            else:
                # terminal = random_state.randint(self.n_features)
                terminal = random_state.randint(self.n_features + len(self.fixed_function_set))
            # if terminal == self.n_features:
            if terminal == self.n_features + len(self.fixed_function_set):
                terminal = random_state.uniform(*self.const_range)
                if self.const_range is None:
                    # We should never get here
                    raise ValueError('A constant was produced with '
                                     'const_range=None.')
            # 新增
            elif terminal >= self.n_features:
                terminal = make_ts_function(self.fixed_function_set[terminal - self.n_features],
                                            self.d_ls, random_state)
            program.append(terminal)
            terminal_stack[-1] -= 1
            while terminal_stack[-1] == 0:
                terminal_stack.pop()
                if not terminal_stack:
                    return program
                terminal_stack[-1] -= 1

    # We should never get here
    return None

3 execute(self, X)

和上一节一样,先简单讲一下原函数。

首先建立一个空列表apply_stack,对于program列表中的每个结点,按顺序从头开始循环。如果该结点为函数,则在apply_stack的最后添加一个只含有该函数的单元素列表;否则该结点为整数或浮点数,那就把这个数添加到apply_stack中最后一个列表的最后,作为这个列表的第一个元素(某函数)的一个参数。

接下来要判断apply_stack中最后一个列表的元素个数,是不是等于它对应的函数应有的参数个数+1,也就是这个分支有没有走到头,如果参数是足够的那就可以计算了:进行一下变换,浮点数扩充为长度为样本个数的向量;整数替换为X的对应列,也有可能是已经计算好的一个向量,直接传入就好了,这样这一个分支就计算完毕了。

这样apply_stack中最后一个列表就可以删掉了,计算得到的向量就传入apply_stack中前一个列表的最后,作为一个参数。

以此类推,从右至左,从下至上地不断合并计算,最终就会得到最终的结果。


同上,我们需要调整的点有两个:首先,对于固定参数的函数,在导入的时候我们就直接把函数结果计算出来,也就是在判断结点是函数的时候,再增加一个条件,判断函数的arity是否为0,如果为0那就是固定参数的函数,这时候直接代入对应参数计算结果,把结果导入apply_stack就好了。

此外,考虑到有时候在变异后可能存在一个program只有一个结点(整数、浮点数或一个固定参数函数),这时候apply_stack是空的,按照原逻辑就会出bug,于是就再加一个条件判断apply_stack是否为空,若为空则直接返回对应结果。

另外,再进行一下说明,我们这里没有对含时间序列参数的函数做特殊化,这是因为在第二章定义_Function类的时候已经考虑过了,会直接把d加入计算。


最后贴一下修改后的代码:

def execute(self, X):
    """Execute the program according to X.

    Parameters
    ----------
    X : {array-like}, shape = [n_samples, n_features]
        Training vectors, where n_samples is the number of samples and
        n_features is the number of features.

    Returns
    -------
    y_hats : array-like, shape = [n_samples]
        The result of executing the program on X.

    """
    # Check for single-node programs
    node = self.program[0]
    if isinstance(node, float):
        return np.repeat(node, X.shape[0])
    if isinstance(node, int):
        return X[:, node]

    apply_stack = []

    for node in self.program:

        if isinstance(node, _Function):
            # 新增
            if node.arity != 0:
                apply_stack.append([node])
            else:
                args_ = [X[:, self.feature_names.index(name)] for name in node.params_need]
                t = node(*args_)
                if apply_stack:
                    apply_stack[-1].append(t)
                else:
                    return t
        else:
            # Lazily evaluate later
            if apply_stack:
                apply_stack[-1].append(node)
            else:
                return np.repeat(node, X.shape[0]) if isinstance(node, float) \
                    else X[:, node]

        while len(apply_stack[-1]) == apply_stack[-1][0].arity + 1:
            # Apply functions that have sufficient arguments
            function = apply_stack[-1][0]
            terminals = [np.repeat(t, X.shape[0]) if isinstance(t, float)
                         else X[:, t] if isinstance(t, int)
            else t for t in apply_stack[-1][1:]]
            intermediate_result = function(*terminals)
            if len(apply_stack) != 1:
                apply_stack.pop()
                apply_stack[-1].append(intermediate_result)
            else:
                return intermediate_result

    # We should never get here
    return None

4 变异

第一章分析原代码的时候讲过,变异一共有4种:交叉、子树变异、hoist变异、点变异,其中只有点变异是需要调整的。

接下来具体说说怎么调整。对于结点,替换函数的时候我们要把普通函数和时间序列函数放在一起,如果抽到了时间序列函数就调用make_ts_function就好了。对于叶子点,我们要把固定参数函数加进来,逻辑很简单看代码就好,不赘述了。


贴一下修改后的代码:

def point_mutation(self, random_state):
    """Perform the point mutation operation on the program.

    Point mutation selects random nodes from the embedded program to be
    replaced. Terminals are replaced by other terminals and functions are
    replaced by other functions that require the same number of arguments
    as the original node. The resulting tree forms an offspring.

    Parameters
    ----------
    random_state : RandomState instance
        The random number generator.

    Returns
    -------
    program : list
        The flattened tree representation of the program.

    """
    program = copy(self.program)

    # Get the nodes to modify
    mutate = np.where(random_state.uniform(size=len(program)) <
                      self.p_point_replace)[0]

    for node in mutate:
        mutated = False
        if isinstance(program[node], _Function):
            arity = program[node].arity
            # 新增条件
            if arity != 0:
                # Find a valid replacement with same arity
                replacement = len(self.arities[arity])
                replacement = random_state.randint(replacement)
                replacement = self.arities[arity][replacement]
                if replacement.is_ts:
                    replacement = make_ts_function(replacement, self.d_ls, random_state)
                program[node] = replacement
                # 新增
                mutated = True
        if not mutated:
            # We've got a terminal, add a const or variable
            if self.const_range is not None:
                # terminal = random_state.randint(self.n_features + 1)
                terminal = random_state.randint(self.n_features + len(self.fixed_function_set) + 1)
            else:
                # terminal = random_state.randint(self.n_features)
                terminal = random_state.randint(self.n_features + len(self.fixed_function_set))
            # if terminal == self.n_features:
            if terminal == self.n_features + len(self.fixed_function_set):
                terminal = random_state.uniform(*self.const_range)
                if self.const_range is None:
                    # We should never get here
                    raise ValueError('A constant was produced with '
                                     'const_range=None.')
                # 新增
                program[node] = terminal
            # 新增
            elif terminal >= self.n_features:
                replacement = self.fixed_function_set[terminal - self.n_features]
                replacement = make_ts_function(replacement, self.d_ls, random_state)
                program[node] = replacement
            else:
                program[node] = terminal

    return program, list(mutate)

5 其他

对于其他函数,有一部分不需要修改,一部分简单地修改,还有一部分废弃了,我简要说明一下吧。

  • 首先是不需要修改的:_length,fitness,reproduce,没什么可说的。

  • 然后是简单修改的:validate_program,__str__,_depth,由于函数类型增加了所以做了一些更改,直接看代码不难理解,不做过多解释了。

def validate_program(self):
    """Rough check that the embedded program in the object is valid."""
    terminals = [0]
    for node in self.program:
        if isinstance(node, _Function):
            # 新增条件
            if node.arity != 0:
                terminals.append(node.arity)
            # 新增
            else:
                terminals[-1] -= 1
                while terminals[-1] == 0:
                    terminals.pop()
                    terminals[-1] -= 1
        else:
            terminals[-1] -= 1
            while terminals[-1] == 0:
                terminals.pop()
                terminals[-1] -= 1
    return terminals == [-1]

def __str__(self):
    """Overloads `print` output of the object to resemble a LISP tree."""
    terminals = [0]
    output = ''
    for i, node in enumerate(self.program):
        if isinstance(node, _Function):
            # 新增条件
            if node.arity != 0:
                terminals.append(node.arity)
                output += node.name + '('
            # 新增
            else:
                output += node.name + '(%s)' % ', '.join(node.params_need)
                terminals[-1] -= 1
                while terminals[-1] == 0:
                    terminals.pop()
                    terminals[-1] -= 1
                    output += ')'
                if i != len(self.program) - 1:
                    output += ', '
        else:
            if isinstance(node, int):
                if self.feature_names is None:
                    output += 'X%s' % node
                else:
                    output += self.feature_names[node]
            else:
                output += '%.3f' % node
            terminals[-1] -= 1
            while terminals[-1] == 0:
                terminals.pop()
                terminals[-1] -= 1
                output += ')'
            if i != len(self.program) - 1:
                output += ', '
    return output

def _depth(self):
    """Calculates the maximum depth of the program tree."""
    terminals = [0]
    depth = 1
    for node in self.program:
        if isinstance(node, _Function):
            # 新增条件
            if node.arity != 0:
                terminals.append(node.arity)
                depth = max(len(terminals), depth)
            # 新增
            else:
                terminals[-1] -= 1
                while terminals[-1] == 0:
                    terminals.pop()
                    terminals[-1] -= 1
        else:
            terminals[-1] -= 1
            while terminals[-1] == 0:
                terminals.pop()
                terminals[-1] -= 1
    return depth - 1
  • 最后是废弃的,逐个说下原因:export_graphviz,我觉得这个功能我不太需要,懒得改了;get_all_indices和_indices,因为引入了时间序列函数,直接抽样是不太行的,我也没有具体思考应该怎么抽样,这个功能对我来说意义也不大就不用了。

相关文章
利用遗传算法进行高频因子挖掘(一)
利用遗传算法进行高频因子挖掘(二)
利用遗传算法进行高频因子挖掘(三)
利用遗传算法进行高频因子挖掘(四)

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐