Skip to content

上位机与下位机间的通信实现

基本概念

上位机和下位机的概念常出现在嵌入式领域,但并不仅限于嵌入式领域,它同样适用于其他分布式计算系统中的不同层次和角色的设备。上位机和下位机的划分主要是根据其在分布式系统中的角色和功能来决定的。

上位机

(PC/Host Computer、Master Computer 或 Upper Computer)是指控制和协调整个系统的计算机或设备。它通常运行着高级应用程序,并负责处理和管理数据、与用户交互以及与下位机通信,可用于控制或监视另一台或多台下位机。上位机可以是桌面计算机、服务器或其他高级计算设备。

下位机

(Peripheral Device、Slave Computer 或 Lower Computer)是指与上位机连接的终端设备(如传感器、执行器、控制器等)。它通常负责执行具体的硬件操作和数据处理任务,如传感器数据采集、控制指令的执行等。下位机可以是微控制器、传感器、执行器或其他低级计算设备。

与下位机相比,上位机通常具有更强大的计算和处理能力,并且能够处理和分析从下位机中获取的数据,实现更高级、更智能的应用。例如在工业自动化应用中,通过上位机实现包括工业控制、机器人控制、实验室测量、过程监控,以及高级的数据分析和可视化等功能,帮助用户更好地理解下位机数据并作出更明智的决策。

分布式系统

(Distributed System)这是一个或许有些相近的概念,选取MIT6.824对这个概念的解释。

当人们确实需要: - 并行性 - 容错性 Fault Tolerance - 可用 availability - 可恢复 Recoverability - 多台物理实体 - 安全性security /孤立性 isolated - 高性能 performance 的时候,才不得不去设计并实现一个分布式系统,这会有许多挑战: - 系统部分的故障 - 通讯的故障 - 数据一致性 Consistency 一般的,被连接起来的是许多计算机,用来构建大型的网站服务。 现有的理论与实践聚焦于: - 存储 - 通信 - 计算 常用的实践包括: - RPC(原理参考 - Threads - Concurrency - 非易失性存储 - 副本管理

黑色生死恋,完整而强大()

通讯过程

上位机和下位机之间通过通信接口或协议进行数据交换和控制信息传递。上位机指示下位机执行特定任务,并接收下位机返回的数据或状态信息。上位机和下位机的合作使得整个系统可以实现复杂的功能和任务。

至于具体如何通讯,一般取决于下位机提供的通讯接口,比如 RS-232、RS-485、以太网、USB,或者是 WiFi、蓝牙之类的无线通信方式。除了通讯接口,下位机还需要具有可靠的、可扩展的通讯协议,比如 Modbus、CAN 等。具体到不同行业,例如仪器仪表、工业控制、汽车行业等,还可能会有特定的标准协议。

需求

需要为我的机器人任务规划系统完成上位机-下位机的通讯框架,上位机和下位机都可能有多个,我需要实现多对多的通讯,保证信息可靠到达,支持配置多种通讯协议,并且预留大数据量,高实时性的拓展可能。

对于进一步开发而言,嵌入式相关平台其实支持最好的语言是c,而且也有必要基于不同的情况使用不同的传输层协议,对不同的请求也要支持快速配置的分类处理才好。

总结一下的话:

  1. 实时性:一部分数据(如地图)需要在短时间内传输和处理,延迟要尽可能低。
  2. 异构平台通信:不同平台和系统之间需要进行无缝通信,可能涉及不同的编程语言、操作系统和硬件架构。
  3. 扩展性:系统需要能快速配置不同的传输层通信协议,配置应用层处理协议,易于添加新的通信连接。
  4. 可靠性和容错性:对于高优先级的部分数据(如操作序列),有必要保障数据传输的可靠。
  5. 安全性:需要安全的鉴权注册,重要的数据应当以密文传输(例如伪装发件人攻击让机器误以为自己在其他地方或者错误地遵循指令。。画面不要太美)

设计

为了通用,c和python的版本都要有; 实时性和可靠性,能够选择用TCP还是UDP就可以,具体怎么选可以通过设置某种请求的优先级来决定; 扩展性,可以参考常见网络框架中的 路由器(Router)概念,将传入的请求分发到适当的处理器(或控制器),支持配置合适的回调函数就可以了; 安全性,需要的时候加个SSL。

目标是像个胶水,能把不同协议粘起来就行,首先自己实现对字节流的封装和解封装,然后可以定义自己的package,要求发送“协议名identifier-内容”,接下来客户端和服务端有一个map能映射到相应的handler函数来处理内容。

参考魔晶工作室的异步通信项目Pyrite,非常感谢维护者的耐心答疑😆

handler的设计

#include "package.h"

#include <mocutils/log.h>
//头文件的具体实现
//构造函数
prt::package::package() {
    this->sequence = 0;
    this->identifier = "";
    this->body = prt::bytes(0);
}

prt::package::package(prt::bytes raw) {
    this->sequence = raw.next_int32();
    this->identifier = raw.next_string();
    this->body = raw.range(raw.ptr, raw.size());
}

prt::package::package(package *old) {
    assert(old);
    this->sequence = old->sequence;
    this->identifier = old->identifier;
    this->body = old->body;
}

prt::package::package(i32 sequence, std::string identifier, bytes body) {
    this->sequence = sequence;
    this->identifier = identifier;
    this->body = body;
}

//将sequence信息转换成字节和body信息拼接
prt::bytes prt::package::to_bytes() {
    bytes ret(4);
    for (int i = 0; i < 4; i++)
        ret[i] = (moc::byte)(this->sequence >> (i * 8));
    ret = ret + bytes(this->identifier);
    return ret + this->body;
}

bool prt::package::operator==(const prt::package &other) {
    if (other.sequence != this->sequence) return false;
    if (other.identifier != this->identifier) return false;
    if (other.body != this->body) return false;
    return true;
}

bool prt::package::operator!=(const prt::package &other) {
    return !(this->operator==(other));
}

void prt::package::set_body(std::string text) {
    this->body = bytes(text);
}

//调用bytes对象的转换字符串方法
std::string prt::package::body_as_string() {
    return this->body.to_string();
}

//发包函数
void prt::package::send_to(int socket_fd, sockaddr_in socket_addr) {
    //将当前的 package 对象转换为字节序列
    bytes pkg_bytes = this->to_bytes();
    //检查生成的字节序列是否超出了最大传输大小,
    //如果 pkg_bytes 的大小超出限制,
    //则调用 moc::panic("content overflowed"),引发一个异常或终止程序,防止数据溢出错误。
    if (pkg_bytes.size() > prt::max_transmit_size)
        moc::panic("content overflowed");

    char msg[pkg_bytes.size()];
    for (int i = 0; i < pkg_bytes.size(); i++)
        msg[i] = pkg_bytes[i];

    //基于 POSIX 标准的 Berkeley sockets API,
    //通常用于网络编程,特别是使用UDP协议
    //用于通过套接字发送数据包到指定的网络地址。
    //这个函数在几乎所有的 Unix-like 操作系统(如Linux、macOS)和Windows的网络编程中都可以使用。
    sendto(socket_fd, msg, pkg_bytes.size(), 0, (struct sockaddr *)&socket_addr, sizeof(socket_addr));
    //参数与返回值说明:
    //sockfd:一个套接字的文件描述符,指定要通过哪个套接字发送数据。
    //buf:一个指向要发送的数据的指针。
    //len:要发送的数据的长度(以字节为单位)。
    //flags:发送操作的标志位,通常设置为0以使用默认行为。
    //dest_addr:一个指向目的地址的 sockaddr 结构体的指针,这里指定了数据包的目标网络地址。
    //addrlen:dest_addr 结构体的长度(通常使用 sizeof(struct sockaddr) 来获取)。
    //sendto 返回发送的字节数,如果发生错误,返回 -1,并设置 errno 以指示错误类型。
}

bool prt::package::operator<(prt::package &other) {
    auto b1 = this->to_bytes();
    auto b2 = other.to_bytes();
    int l = b1.size();
    if (b2.size() < l)
        l = b2.size();
    for (int i = 0; i < l; i++)
        if (b1[i] != b2[i])
            return b1[i] < b2[i];
    if (b2.size() > b1.size())
        return true;
    return false;
}
#ifndef _PRT_CLIENT_H
#define _PRT_CLIENT_H

#include <mocutils/channel.h>

#include "define.h"
#include "functional"
#include "map"
#include "package.h"

namespace prt {
class client {
    //server的socket文件描述符,地址
    int server_fd;
    sockaddr_in server_addr;
    //路由
    std::map<std::string, std::function<bytes(bytes)>> router;
    int sequence;

    std::map<int, moc::channel<prt::package> *> promise_buf;

 public:
 //连接状态
    connection_state state;
    client(const char *ip, int port);
    ~client();
//流程函数
    void start();
    void async();
    //设置处理函数
    bool set_handler(std::string identifier, std::function<bytes(bytes)> handler);
    static void *process(void *_args);
    void tell(std::string identifier, bytes body);
    bytes promise(std::string identifer, bytes body);
};
}    // namespace prt

#endif
#include "client.h"

#include <mocutils/log.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <cstring>

#include "pthread.h"

prt::client::client(const char *ip, int port) {
#ifdef Windows
    WSADATA data;
    if (WSAStartup(MAKEWORD(2, 2), &data))
        moc::panic("WSAStartup failed.");
#endif
    //初始状态关闭
    this->state = prt::closed;
    //尝试建立一个套接字
    if ((this->server_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
        moc::warnf("Pyrite connect creation failed. Server addr: %s:%d.", ip, port);
        return;
    }

    //地址初始化空间
    memset(&this->server_addr, 0, sizeof(this->server_addr));

    //指定地址家族为 AF_INET,表示使用 IPv4
    this->server_addr.sin_family = AF_INET;

    //将端口号转换为网络字节序,并存储在 sin_port 中。htons 是 "host to network short" 的缩写,
    //将主机字节序(通常是小端序)转换为网络字节序(大端序)
    this->server_addr.sin_port = htons(port);

    //将 IP 地址从点分十进制字符串转换为一个 32 位的二进制数
    this->server_addr.sin_addr.s_addr = inet_addr(ip);

    //初始化sequence长度
    this->sequence = 0;

    //现在client准备好进行通信
    this->state = prt::established;
}

prt::client::~client() {
    close(this->server_fd);
#ifdef Windows
    WSACleanup();
#endif
}

void prt::client::start() {
    int recv_len;
    socklen_t l = sizeof(this->server_addr);
    pthread_t tid;
    char buf[prt::max_transmit_size];

    while (true) {
        recv_len = recvfrom(this->server_fd, buf, prt::max_transmit_size, 0, (struct sockaddr *)&this->server_addr, &l);
        if (recv_len < 0) {
            //使用 static int counter 来跟踪连续失败的次数,counter 每次加 1,循环到 32 时重新开始。
            static int counter;
            counter %= 32;
            if (++counter) continue;
            moc::warnf("invalid recv_len: %d.", recv_len);
            continue;
        }
        process_args *args = new process_args;
        args->ptr = this;
        //数据拷贝,每次新接收数据buf也会装入新数据,所以这里是安全的
        args->pkg = prt::package(prt::bytes(buf, recv_len));

        //创建线程处理接受数据,避免主线程阻塞
        pthread_create(&tid, NULL, this->process, (void *)args);
    }
}

namespace prt {
void *client_async_runner(void *args) {
    client *c = (client *)args;
    c->start();
    return nullptr;
}
};  // namespace prt

void prt::client::async() {
    pthread_t tid;
    pthread_create(&tid, NULL, client_async_runner, (void *)this);
}

//用于设置一个处理程序(handler),该处理程序会根据数据包的标识符(identifier)来处理接收到的数据包。
//处理程序是一个函数对象(std::function),它接受一个 bytes 对象作为输入,并返回一个 bytes 对象作为输出。
bool prt::client::set_handler(std::string identifier, std::function<bytes(bytes)> handler) {
    //检查标识符是否以 "prt-" 开头
    if (identifier.find("prt-") == 0)
        return false;
    this->router[identifier] = handler;
    return true;
}

//process 函数用于处理接收到的数据包。
//这个函数会在一个独立的线程中执行,负责解析数据包并调用适当的处理程序,同时发送回应(如果需要)
void *prt::client::process(void *_args) {
    //检查参数是否不为空,提取参数,释放空间
    assert(_args);
    process_args *args = (process_args *)_args;
    prt::client *client_ptr = (prt::client *)args->ptr;
    prt::package recv_pkg = args->pkg;
    delete args;

    // process prt ack
    //如果 client_ptr->promise_buf 中存在对应的序列号缓冲区,将数据包 recv_pkg 写入该缓冲区,处理完毕
    if (recv_pkg.identifier == "prt-ack") {
        if (client_ptr->promise_buf[recv_pkg.sequence])
            *client_ptr->promise_buf[recv_pkg.sequence] << recv_pkg;
        return nullptr;
    }

    //检查 recv_pkg.identifier 是否在 client_ptr->router 中存在。
    //如果不存在,将标识符设置为 "*",表示通配符匹配。
    if (!client_ptr->router.count(recv_pkg.identifier))
        recv_pkg.identifier = "*";
    //如果 client_ptr->router 中仍然没有对应的标识符,返回 nullptr,表示没有适当的处理程序。
    if (!client_ptr->router.count(recv_pkg.identifier))
        return nullptr;
    //调用与标识符关联的处理程序,将数据包的 body 作为参数传入,获取处理结果 reply。
    prt::bytes reply = client_ptr->router[recv_pkg.identifier](recv_pkg.body);

    //如果处理结果不为空,创建一个回应数据包 reply_pkg,将 reply 作为回应体,并设置标识符为 "prt-ack"
    if (!reply.size())
        return nullptr;
    prt::package reply_pkg(recv_pkg.sequence, "prt-ack", recv_pkg.body);
    reply_pkg.send_to(client_ptr->server_fd, client_ptr->server_addr);
    return nullptr;
}

//tell 函数用于发送一种无需响应的数据包到服务器,不需要等待服务器的回应。
void prt::client::tell(std::string identifier, bytes body) {
    prt::package pkg(-1, identifier, body);
    pkg.send_to(this->server_fd, this->server_addr);
}

//promise发送请求后会等待服务器的回应。该函数会阻塞直到接收到回应,并返回回应的数据体。
prt::bytes prt::client::promise(std::string identifer, bytes body) {
    int seq = this->sequence++;
    prt::package pkg(seq, identifer, body);
    pkg.send_to(this->server_fd, this->server_addr);

    //channel 管道,用于等待和接收来自服务器的回应
    //在mocutils中实现
    this->promise_buf[seq] = makeptr(channel, prt::package);
    prt::package reply;
    *this->promise_buf[seq] >> reply;
    delete this->promise_buf[seq];
    return reply.body;
}

项目地址

(待继续更新)