本文讲述OPC UA Server如何监测(monitor)变量值,变量属于variable节点。


一 监测的目的

为什么要监测变量值呢?如果想去查看某变量的当前值是不是发生变化,我们第一想到的方式是使用poll的方式,查不到这个值就一直在那等着,而且很有可能多次查到的值都是没变化的,这样比较浪费时间而且效率不高。

监测变量值则可以让我们不用去主动poll,当变量值发生变化时会主动进行通知,当收到通知后我们再去执行相应的代码。下面让我们来看下如何监测(整体源码在第五节


二 添加一个变量

我们先在Server端添加一个变量,这个变量后面会被监测,代码如下,细节也可以参考这篇文章,略有不同,

// 使用string来定义变量节点的node id
UA_NodeId addTheAnswerVariable(UA_Server *server) 
{
    /* Define the attribute of the myInteger variable node */
    UA_VariableAttributes attr = UA_VariableAttributes_default;
    UA_Int32 myInteger = 1;
    UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
    attr.description = UA_LOCALIZEDTEXT("en-US","the answer");
    attr.displayName = UA_LOCALIZEDTEXT("en-US","the answer");
    attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId theAnswerNodeId = UA_NODEID_STRING(1, "the.answer");
    UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, "the answer");
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_Server_addVariableNode(server, theAnswerNodeId, parentNodeId,
                              parentReferenceNodeId, myIntegerName,
                              UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
    
    return theAnswerNodeId;
}

该变量的node id是string格式,对外显示的名字叫“the answer”,初始值为1,添加完毕后函数返回变量的node id


三 Server端创建监测项

首先让我们理清一个概念:监测项和被监测变量。这是2个不同的东西,监测项可以看做是监工,被监测变量可以看做是工人,监工监视工人干活。所以,监测项是用来监测被监测变量的。

下面看下server端如何给变量节点添加监测项,

void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
                               void *monitoredItemContext, const UA_NodeId *nodeId,
                               void *nodeContext, UA_UInt32 attributeId,
                               const UA_DataValue *value) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Received Notification");
    
    UA_NodeId * targetNodeId = (UA_NodeId*)monitoredItemContext;

    if (monitoredItemId == monid && UA_NodeId_equal(nodeId, targetNodeId))
    {
        UA_Int32 currentValue = *(UA_Int32*)(value->value.data);
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Current Value: %d\n", currentValue);
        
    }
}


UA_UInt32 addMonitoredItemToVariable(UA_Server *server, UA_NodeId *pTargetNodeId) 
{
    UA_MonitoredItemCreateResult result;
    
    UA_MonitoredItemCreateRequest monRequest = UA_MonitoredItemCreateRequest_default(*pTargetNodeId);
    
    monRequest.requestedParameters.samplingInterval = 100.0; // 100 ms interval
    result = UA_Server_createDataChangeMonitoredItem(server, UA_TIMESTAMPSTORETURN_BOTH,
                                            monRequest, (void*)pTargetNodeId, dataChangeNotificationCallback);
    
    if (result.statusCode == UA_STATUSCODE_GOOD)
    {
        
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, OK.");
        return result.monitoredItemId;
    }
    else
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, Fail.");
        return -1;
    }
}

函数dataChangeNotificationCallback()分析:

  1. 该函数是变量值发生改变时的回调函数,这个函数的参数格式是固定的,只要是OPC UA Server监测变量的回调函数(client端不太一样),它的原型都必须按照这样写,可以看做是一个模板
  2. if语句是用来确定是否是我们想要的被监测变量,第一个是判断monitoredItemId,这个id是监测项的id;第二个是判断nodeId,这个比较好理解,就是被监测变量自身的node id

函数addMonitoredItemToVariable()分析:

  1. 使用类型UA_MonitoredItemCreateRequest生成监测项的创建请求,并指定被监测变量的id,修改采样时间为100ms(默认值是250ms),所谓采样时间就是Server每隔多少时间去查看下被监测变量值
  2. 使用UA_Server_createDataChangeMonitoredItem()去创建监测变量,并传入回调函数dataChangeNotificationCallback(),UA_TIMESTAMPSTORETURN_BOTH是指返回时间戳类型,暂时不需要,就使用这个值就可以了,
  3. 另外还以void*类型还传入了被监测变量的id,这是为了在回调函数里使用monitoredItemContext来重新获取到被监测变量的id
  4. 最后函数返回监测项的id

PS:addMonitoredItemToVariable()可以调用多次来监测多个变量节点。


四 修改被监测变量的值

只有变量值发生改变才会调用上节的回调函数,所以这里添加一个线程去修改变量值。

void *thr_fn(void *arg)
{
    static UA_Int32 update = 2; // 这是静态变量哦
    
    container_t *pContainer = (container_t*)arg;
    UA_Server *server = pContainer->ptr1;
    UA_NodeId *targetNodeId = pContainer->ptr2;
    
    while (1)
    {
        sleep(2);
        UA_Variant myVar;
        UA_Variant_init(&myVar);
        UA_Variant_setScalar(&myVar, &update, &UA_TYPES[UA_TYPES_INT32]);
        UA_Server_writeValue(server, *targetNodeId, myVar);
        
        update++;
        
        if (update == 100)
        {
            update = 2;
        }
    }

    
    return NULL;
}

每隔2s就去把变量值加1,加到100再从1开始,修改变量值是调用UA_Server_writeValue()方法。代码中的targetNodeId是个全局变量,其值是调用addTheAnswerVariable()返回的变量节点id。


五 整体代码及验证

// server.c

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */

#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include "open62541.h"

UA_Boolean running = true;

void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
}

UA_UInt32 monid = 0;

typedef struct container {
	UA_Server * ptr1;
	UA_NodeId * ptr2;
} container_t;



UA_NodeId addTheAnswerVariable(UA_Server *server) 
{
    /* Define the attribute of the myInteger variable node */
    UA_VariableAttributes attr = UA_VariableAttributes_default;
    UA_Int32 myInteger = 1;
    UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
    attr.description = UA_LOCALIZEDTEXT("en-US","the answer");
    attr.displayName = UA_LOCALIZEDTEXT("en-US","the answer");
    attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_NodeId theAnswerNodeId = UA_NODEID_STRING(1, "the.answer");
    UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, "the answer");
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_Server_addVariableNode(server, theAnswerNodeId, parentNodeId,
                              parentReferenceNodeId, myIntegerName,
                              UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
    
    return theAnswerNodeId;
}


void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
                               void *monitoredItemContext, const UA_NodeId *nodeId,
                               void *nodeContext, UA_UInt32 attributeId,
                               const UA_DataValue *value) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Received Notification");
    
    UA_NodeId * targetNodeId = (UA_NodeId*)monitoredItemContext;

    if (monitoredItemId == monid && UA_NodeId_equal(nodeId, targetNodeId))
    {
        UA_Int32 currentValue = *(UA_Int32*)(value->value.data);
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Current Value: %d\n", currentValue);
        
    }
}


UA_UInt32 addMonitoredItemToVariable(UA_Server *server, UA_NodeId *pTargetNodeId) 
{
    UA_MonitoredItemCreateResult result;
    
    UA_MonitoredItemCreateRequest monRequest = UA_MonitoredItemCreateRequest_default(*pTargetNodeId);
    
    monRequest.requestedParameters.samplingInterval = 100.0; // 100 ms interval
    result = UA_Server_createDataChangeMonitoredItem(server, UA_TIMESTAMPSTORETURN_BOTH,
                                            monRequest, (void*)pTargetNodeId, dataChangeNotificationCallback);
    
    if (result.statusCode == UA_STATUSCODE_GOOD)
    {
        
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, OK.");
        return result.monitoredItemId;
    }
    else
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, Fail.");
        return -1;
    }
}


void *thr_fn(void *arg)
{
    static UA_Int32 update = 2;
    container_t *pContainer = (container_t*)arg;
    UA_Server *server = pContainer->ptr1;
    UA_NodeId *targetNodeId = pContainer->ptr2;
    
    while (1)
    {
        sleep(2);
        UA_Variant myVar;
        UA_Variant_init(&myVar);
        UA_Variant_setScalar(&myVar, &update, &UA_TYPES[UA_TYPES_INT32]);
        UA_Server_writeValue(server, *targetNodeId, myVar);
        
        update++;
        
        if (update == 100)
        {
            update = 2;
        }
    }

    
    return NULL;
}



int main(void) 
{    
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    UA_Server *server = UA_Server_new();
    UA_ServerConfig_setDefault(UA_Server_getConfig(server));
    
    UA_NodeId targetNodeId = addTheAnswerVariable(server);
	monid = addMonitoredItemToVariable(server, &targetNodeId);
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Monitored item id: %d\n", monid);
    
    container_t cont = {server, &targetNodeId};
    pthread_t tid;
    int err = pthread_create(&tid, NULL, thr_fn, &cont);
    if (err != 0)
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Create thread Fail.");
        return -1;
    }

    UA_StatusCode retval = UA_Server_run(server, &running);
    
    UA_Server_delete(server);
    
    return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}

代码工程的整体结构如下,
在这里插入图片描述
CMakeLists.txt内容如下,

cmake_minimum_required(VERSION 3.5)

project(demoOpen62541)

set (EXECUTABLE_OUTPUT_PATH  ${PROJECT_SOURCE_DIR}/bin)

add_definitions(-std=c99)

include_directories(${PROJECT_SOURCE_DIR}/open62541)

add_executable(server ${PROJECT_SOURCE_DIR}/src/server.c ${PROJECT_SOURCE_DIR}/open62541/open62541.c)

target_link_libraries(server pthread)

cd到build执行"cmake … && make",编译ok后去bin目录下运行server,可以看到有如下打印,
在这里插入图片描述
表示监测OK


六 监测多个变量值

这里只给出例子,理解了前面的内容,这里就很简单了。

// server.c

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */

#include <signal.h>
#include <stdlib.h>
#include <unistd.h>

#include "open62541.h"

UA_Boolean running = true;

void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
}


// 添加一个INT32的变量,并设置初始值
UA_NodeId addVariableToObjectsFolder(UA_Server *server, const char * name, 
                                    UA_NodeId nodeId, UA_Int32 initValue) 
{
    /* Define the attribute of the myInteger variable node */
    UA_VariableAttributes attr = UA_VariableAttributes_default;
    UA_Int32 myInteger = initValue;
    UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
    attr.description = UA_LOCALIZEDTEXT("en-US", (char*)name);
    attr.displayName = UA_LOCALIZEDTEXT("en-US", (char*)name);
    attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;

    /* Add the variable node to the information model */
    UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, (char*)name);
    UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
    UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
    UA_Server_addVariableNode(server, nodeId, parentNodeId,
                              parentReferenceNodeId, myIntegerName,
                              UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
    
}


void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
                               void *monitoredItemContext, const UA_NodeId *nodeId,
                               void *nodeContext, UA_UInt32 attributeId,
                               const UA_DataValue *value) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Received Notification");
    
    UA_NodeId * targetNodeId = (UA_NodeId*)monitoredItemContext;
    
    // 检查nodeId是否相等
    if (UA_NodeId_equal(nodeId, targetNodeId))
    {
        // 检查数据类型是否是INT32
        if (value->value.type == &UA_TYPES[UA_TYPES_INT32])
        {
            UA_Int32 currentValue = *(UA_Int32*)(value->value.data);
            UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Current Value: %d\n", currentValue);
        }
    }
}


UA_UInt32 addMonitoredItemToVariable(UA_Server *server, UA_NodeId *pTargetNodeId) 
{
    UA_MonitoredItemCreateResult result;
    
    UA_MonitoredItemCreateRequest monRequest = UA_MonitoredItemCreateRequest_default(*pTargetNodeId);
    
    monRequest.requestedParameters.samplingInterval = 100.0; // 100 ms interval
    result = UA_Server_createDataChangeMonitoredItem(server, UA_TIMESTAMPSTORETURN_BOTH,
                                            monRequest, (void*)pTargetNodeId, dataChangeNotificationCallback);
    
    if (result.statusCode == UA_STATUSCODE_GOOD)
    {
        
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, OK.");
        return result.monitoredItemId;
    }
    else
    {
        UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Add monitored item for variable, Fail.");
        return -1;
    }
}


static void updateValueCallback(UA_Server *server, void *data)
{
    UA_NodeId * ptr = (UA_NodeId *)data; // address of nodeIdArray in main()
    for (uint32_t i = 0; i < 5; ++i)
    {
        UA_Variant myVar;
        UA_Variant_init(&myVar);
        
        // read first
        UA_Server_readValue(server, ptr[i], &myVar);
        if (myVar.type == &UA_TYPES[UA_TYPES_INT32])
        {
            // new value is (old value + 2)
            UA_Int32 update = *(UA_Int32*)(myVar.data) + 2;
            
            UA_Variant_init(&myVar);
            UA_Variant_setScalar(&myVar, &update, &UA_TYPES[UA_TYPES_INT32]);
            UA_Server_writeValue(server, ptr[i], myVar);
        }
    }
}

int main(void) 
{    
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    UA_Server *server = UA_Server_new();
    UA_ServerConfig_setDefault(UA_Server_getConfig(server));
    
    UA_NodeId nodeIdArray[5] = {};
    
    
    for (uint32_t i = 0; i < 5; ++i)
    {
        char varName[20];
        
        nodeIdArray[i] = UA_NODEID_NUMERIC(1, 8000+i);
        
        snprintf(varName, 20, "answer%d", i);
        UA_Int32 initValue = i*100+1;
        addVariableToObjectsFolder(server, varName, nodeIdArray[i], initValue);
        
        UA_UInt32 monid = addMonitoredItemToVariable(server, &nodeIdArray[i]);
        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, 
                    "Monitored item id: %d, var id: %d, init value: %d\n", 
                        monid, 8000+i, initValue);
    }
        
    // call updateValueCallback() every 2s
    UA_UInt64 callbackId = 0;
    UA_Server_addRepeatedCallback(server, updateValueCallback, &nodeIdArray[0], 2000, &callbackId);

    UA_StatusCode retval = UA_Server_run(server, &running);
    
    UA_Server_delete(server);
    
    return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}

CMakeLists.txt不变,直接编译就行,运行OK,
在这里插入图片描述
红色是分隔线


七 总结

本文主要讲述如何在OPC UA Server端监测变量节点的值,这种方法可以让我们编写异步代码,提高效率。

比较重要的地方是监测的周期和变量变化的周期,需要根据实际情况进行设置,一定要保证监测周期小于变量变化的周期,这样才可以对变量的变化进行及时的响应。

如果有写的不对的地方,希望能留言指正,谢谢阅读。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐