在《一种特殊的栈破坏崩溃问题》中我们根据实际项目遇到的问题总结了一种破坏栈区导致的错误问题,对于此问题,上面文章总有点表述不清楚的感觉,本文基于基本知识点来梳理此问题,其目的是简单易懂的说明这个问题
首先我们需要知道aapcs中的如下表述,我们只需要看B.4
当符合类型超过16字节的时候,aapcs会启用x19保存加载内存的指针。那么此问题要出现,我们原始结构体应该是小于16字节的结构体。
那么需要具备条件的结构体如下
struct kernel{ int x; int y; int z; int s; };
我们知道栈的对齐是16字节,实际上,对于值传递寄存器保存的值,也需要16字节对齐。 我们对test函数反汇编,那么如下
In file: /tmp/test.c:33 28 } 29 30 void test(struct kernel k) 31 { 32 struct user* s = (struct user*)&k; ► 33 s->o4 = 3; 34 test1(s); 35 return ; 36 } 37 38 int main(int argc, char *argv[]) ───────────────────────────────────────────────────────────────────────────────────────[ STACK ]─────────────────────────────────────────────────────────────────────────────────────── 00:0000│ x29 sp 0x7ffffff2f0 —▸ 0x7ffffff320 —▸ 0x7ffffff380 ◂— 0 01:0008│ 0x7ffffff2f8 —▸ 0x4006e8 (main+108) ◂— mov w0, #0 02:0010│ x0 0x7ffffff300 ◂— 0x20000000b /* '\x0b' */ 03:0018│ 0x7ffffff308 ◂— 3 04:0020│ 0x7ffffff310 ◂— 1 05:0028│ 0x7ffffff318 —▸ 0x7ffffff300 ◂— 0x20000000b /* '\x0b' */ 06:0030│ 0x7ffffff320 —▸ 0x7ffffff380 ◂— 0
我们看到栈顶是0x7ffffff2f0,那么先保存x29和x30之后,其他剩下的寄存器需要保存值的起始地址应该是0x7ffffff300
所以,我们知道k变量的默认地址在0x7ffffff300,因为k的大小是16字节,那么0x7ffffff310应该就是下一个局部变量的地址 。0x7ffffff308是k结构体的结束。如果k的结构体不是16字节,那么也会按照16字节对齐到0x7ffffff310地址上
但是我们看到的0x7ffffff310并不是下一个局部变量的地址,而0x7ffffff318是下一个局部变量s的地址
这里的原因是,从0x7ffffff310到0x7ffffff318共8个字节。
这里gcc的实现故意在此情况添加了8个字节的padding。
也就是说,默认情况下,值传递不超过16字节时,那么默认分配的栈来保存寄存器值的空间是16+8等于24字节
我们知道0x7ffffff318的后面是下一个局部变量的分配地址,那么0x7ffffff318就是s的地址。
那么如果正常访问,从k的地址0x7ffffff300开始,只能访问
0x7ffffff300/0x7ffffff304/0x7ffffff308/0x7ffffff30c
那么如果想要访问到0x7ffffff318,那么需要再访问3个4字节,所以,假设强制类型转换为
struct user{ int x; int y; int z; int s; int o1; int o2; int o3; };
此时访问s->o3,则会访问到0x7ffffff318地址,而0x7ffffff318地址又是指针s在栈区的地址,那么破坏了s的地址,程序访问出现段错误。
本文更清晰的介绍了这个栈破坏的问题。助于理解
之前的内容已经能够在基于开源组件mosquitto 来进行mqtt的演示和利用了,但是一个产品功能并不是简单使用使用就完事儿了,所以需要开发。 而开发又分为两部分: 一个是协议栈开发,也就是针对官方协议文档去实现这份协议框架 另一个是应用开发,也就是针对协议栈进行产品应用场景的代码开发
这里我从网上找到了别人开发好的协议栈源码(cMQTT),在借助他人源码的基础之上,开发一个简单的应用程序,实现上述开源组件发布和订阅的基本功能,然后在此基础上衍生,大家一起探讨探讨,如何自己开发这个MQTT协议栈。
git clone https://github.com/YorkJia/cMQTT.git
配置和编译
./confgure.sh && make install
这时候就会出现libcMQTT.so。这就可以利用它,去简单的编写一些代码了
touch simple_pub.c
int main(int argc, char *argv[]) { int res, loop_cnt = 0, cnt = 0; mqtt_client_t *pclient = NULL; pclient = mqtt_client_new("127.0.0.1", 1883, NULL, "client/01", "kylin", "qwe123"); mqtt_set_will_opt(pclient, MQTT_QOS0, 0, "test/a", NULL); do{ printf("try to connect broker...\n"); res = mqtt_client_connect(pclient); sleep(1); }while(res != SUCCESS_RETURN); while (1) { example_publish(pclient, "Hello World"); mqtt_client_yield(pclient, 1000); } mqtt_client_close(pclient);} int example_publish(mqtt_client_t *pclient, char* data) { char payload[100]; if(pclient == NULL){ return FAIL_RETURN; } memset(payload, 0, sizeof(payload)); sprintf(payload, data); mqtt_publish_simple(pclient, "test/a", MQTT_QOS0, payload, strlen(payload)); }
gcc simple_pub.c -I ../../infra/ -I ../../mqtt/ -L ../../ -lcMQTT -lpthread -o simple_pub
mosquitto_sub -h 127.0.0.1 -t "test/a" -u kylin -P qwe123 ./simple_pub
可以看到mosquitto能够正常接收到hello world字串
至此,一个最简单的发布者代码已经编译完成了
touch simple_sub.c
int main(int argc, char *argv[]) { int res, loop_cnt = 0, cnt = 0; mqtt_client_t *pclient = NULL; pclient = mqtt_client_new("127.0.0.1", 1883, NULL, "client/02", "kylin", "qwe123"); mqtt_set_will_opt(pclient, MQTT_QOS0, 0, "test/a", NULL); do{ printf("try to connect broker...\n"); res = mqtt_client_connect(pclient); sleep(1); }while(res != SUCCESS_RETURN); while (1) { res = mqtt_subscribe(pclient, "test/a", MQTT_QOS0, example_message_arrive, NULL); if(res < 0){ printf("subscribe[%s] fail.\n", "test/a"); } mqtt_client_yield(pclient, 1000); } mqtt_client_close(pclient); } void example_message_arrive(void *pcontext, void *pclient, mqtt_event_msg_t *msg) { mqtt_topic_info_t *topic_info = (mqtt_topic_info_t *)msg->msg; switch (msg->event_type) { case IOTX_MQTT_EVENT_PUBLISH_RECEIVED: /* print topic name and topic message */ printf("Message Arrived:\n"); printf("Topic : %.*s\n", topic_info->topic_len, topic_info->ptopic); printf("Payload: %.*s\n", topic_info->payload_len, topic_info->payload); break; default: break; } }
gcc simple_sub.c -I ../../infra/ -I ../../mqtt/ -L ../../ -lcMQTT -lpthread -o simple_sub
./simple_sub mosquitto_pub -h localhost -V mqttv31 -t 'test/a' -u kylin -P qwe123 -i "c3" -m "Hello World" -M 0
至此,一个最简单的订阅者代码已经编译完成了
mosquitto -c /etc/mosquitto/mosquitto.conf -d
./simple_sub ./simple_pub
说起协议栈的开发,与应用开发相比,一般都会是一个比较大的工程。就好像做菜。 应用开发者就好比去菜市场买菜,回家烹饪(或者点个外卖)。 协议栈开发者就好比从菜种子开始种,直到成熟后,再摘菜回家烹饪。
此文章不讨论实现MQTT的细节和展示协议栈实现的具体代码。而是和大家一起讨论讨论,怎么给菜园子松松土,把种子播进去。
从协议来看,代理的职责是接收订阅者的请求,将消息发送给订阅者,接收发布者的消息。 所以实现代理的方式应该大致如下:
其中,发布者和订阅者都是通过连接的方式接入Broker,epoll负责接收所有的事件数据,并解析和转发响应数据
从协议来看,订阅者的职责是:设置QoS质量,选择需要订阅的主题,并将其封装成报文发送给代理去解析。然后回调接收订阅的消息。 所以实现订阅者的方式应该大致如下:
其中,回调处理主要为业务处理逻辑,如获取的温度,湿度,亮度等信息需要如何处理。 封装和解析的报文包括:CONNECT,CONNACK,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK,PINGREQ,DISCONNECT
从协议来看,发布者的职责是:设置QoS质量,选择需要订阅的主题,并将其封装成报文发送给代理去解析。而发布者同时也可做订阅者,所以也可以回调接收订阅的消息 所以实现发布者(仅发布)的方式应该大致如下:
封装和解析的报文包括: CONNECT,CONNACK,PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,PINGREQ,DISCONNECT
这里列举需要实现的协议特性:
1.主题设置:合理的设置主题和判断主题
2.主题过滤器:合理的使用通配符
3.会话管理:Broker合理的管理和调度众多的会话连接
4.保持连接:合理的判断连接是否持续
5.临终遗嘱:合理的执行遗嘱内容,包括主题,消息
6.响应:合理的设置响应,如服务端在合理的时间内收不到connect报文,应该主动管理会话
7.Qos等级:合理的运用QoS设置报文发送方式
8.清理会话:合理的保留和遗弃上一次会话的消息
9.保留消息:合理的判断是否保留上一次发送的消息
至此,我们演示了MQTT的应用场景,我们既需要基于MQTT开发应用,又需要根据MQTT定制协议
本文基于物联网来介绍一下MQTT
MQTT:MQ Telemetry Transport,消息队列遥测传输协议。 它是非常轻量的消息传递协议,对于与需要较小代码占用和/或网络带宽非常宝贵的远程位置建立连接,它最有用。 MQTT是分布式的软总线,它的消息通过订阅者,发布者,消息代理三个角色实现,它能存在于所有的智能设备上。从而达到万物互联。当然实现物联网的物联网协议也很多,如CoAP,HTTP,UPnP,XMPP等,但MQTT具有更加简单,轻量的优势。
QoS0级别的消息发送,意味着接收者并不会响应消息,发送者也不会做重试判断,所以消息最多可能送达一次,但也有可能无法送达
发送者:
发送这个 PUBLISH报文
接收者:
接收这个PUBLISH报文
QoS1级别的消息发送,意味着消息至少被发送一次,并确定至少送达一次
发送者:
必须在每个新的消息上分配一个报文标识符,发送的PUBLISH报文必须标识为QoS=1,DUP=0,且必须将这个报文当作未确认的报文,直到接收到对应的响应报文
接收者:
响应PUBACK报文必须包含一个报文标识符,需要与接收到的PUBLISH报文相同
发送PUBACK报文后,接收者必须将任何包含相同报文标识符的PUBLISH报文当作一个新的消息。保证下一次仍能正常接收。
QoS2是最高级别的发送方式,他确保消息不被丢失,同时也确保消息不重复。
发送者:
1.为消息分配一个未使用的报文标识符,然后将PUBLISH报文的标识符为置为QoS2 DUP0。
2.在发送PUBLISH报文时,将PUBLISH报文看作是未被确认的,直到收到PUBREC报文。且在收到PUBREC报文后必须发送一个PUBREL报文,该报文必须和PUBLISH报文具有相同的报文标识符。
3.同时,也必须将这个PUBREL报文看作是未确认的,直到从接收者那里收到对应的PUBCOMP报文。
4.最后,一旦发送了对应的PUBREL报文,就不能重发这个PUBLISH报文。
接收者:
1.响应的 PUBREC 报文必须包含报文标识符,这个标识符来自接收到的、已经接受所有权的PUBLISH 报文。
2.在收到对应的 PUBREL 报文之前,接收者必须发送 PUBREC 报文确认任何后续的具有相同标识符的 PUBLISH 报文。 在这种情况下,它不能重复分发消息给任何后续的接收者。
3.响应 PUBREL 报文的 PUBCOMP 报文必须包含与 PUBREL 报文相同的标识符。
4.发送 PUBCOMP 报文之后,接收者必须将包含相同报文标识符的任何后续 PUBLISH 报文当作一个新的发布。
apt update && apt install mosquitto mosquitto-clients
touch /etc/mosquitto/pwfile
创建用户和密码
mosquitto_passwd /etc/mosquitto/pwfile kylin
密码 qwe123
vim /etc/mosquitto/aclfile user kylin topic write test/# topic read test/#
mosquitto -c /etc/mosquitto/mosquitto.conf -d
mosquitto_sub -h localhost -t "test/a" -u kylin -P qwe123 -i "c1" -h 为指定mqtt的host地址 -t 为指定mqtt的主题 -u 为用户 -P 为密码 -i 为process id
mosquitto_pub -h localhost -V mqttv311 -t 'test/a' -u kylin -P qwe123 -i "c3" -m "Hello World" -M 0 其中-h -t -u -i -P 意义与上一致 -V 指定发布消息的版本(mqttv311/mqttv31) -m 为发布的消息字符串 -M 为指定QoS等级
如上图可以看到,在订阅者这里会阻塞轮询消息,直到发布者发送消息,订阅者就能收到消息“Hello World”
那么,随即而来就会产生一个疑问。之前将了这么多概念,那实际上这是怎么实现的呢?
探索如何实现这个事情,就需要利用tcpdump了
TCP抓包:
tcpdump -i lo tcp port 1883 -X
-i: 代表网络接口
tcp: mqtt是基于tcp传输的
port 1883: mqtt默认传输端口为1883
-X:在抓包时,以十六进制和 ASCII 表示打印每个数据包的数据
上面为完整的数据包内容,鉴于TCP有三次握手,所以取第四个数据包来简单解析MQTT数据包。如下
因为这是一个完整的TCP数据包,所以需要去掉一些TCP协议相关的数据 其中
4500 0053 8998 4000 4006 b30a 7f00 0001 7f00 0001 :为TCP的报文头。不具体分析 ae80 075b 6e6a c507 dded ec9e 8018 0200 fe47 0000 0101 080a 265c 053f 265c 053f :传输控制协议报文信息。不具体分析 101d 0004 4d51 5454 04c2 003c 0002 6333 0005 6b79 6c69 6e00 0671 7765 3132 33 :为真正的TCP数据包,也就是MQTT的数据包。主要分析这块数据
101d: 0x10由协议表:2.2.1可以查到,对应CONNECT,0x1d为字节长度,这里计算为29个字节
0004 4d51 5454 04c2 003c:
0004 :协议名字长度为4
4d51 5454:名字为MQTT(ASCII码)(如果是3.1协议,则字串为MQIsdp(4d51 4973 6470),长度为6)
04:版本号:3.1.1版本号为4, 3.1版本号为3。
c2:连接标志,包含:名称,密码,QoS0,清理会话。(3.1.2.3章节)
003c:保持连接时间,默认为60秒。超时情况下会发送PINGREQ报文用于探测broker和client(发布者和订阅者)直接是否仍在线(3.1.2.10章节)
0002 6333: 0002为Client ID 长度为2,6333 ID为字符串c3(也就是process ID),我发布的时候用-i参数指定了c3
0005 6b79 6c69 6e00 0671 7765 3132 33: 这里为账户kylin 密码 qwe123的字符串明文
至此,一个完整的connect包已经解析完成了。在connect之后,其实后面还有许多数据包都能进行解析。
之前我们讨论了EDF调度器的实施策略,在rtems上,我们可以通过修改测试程序来演示一下edf调度器对任务调度的现象。
为了能够开始测试代码,我们需要首先创建三个任务,如下
Task_name[ 1 ] = rtems_build_name( 'T', 'A', '1', ' ' ); Task_name[ 2 ] = rtems_build_name( 'T', 'A', '2', ' ' ); Task_name[ 3 ] = rtems_build_name( 'T', 'A', '3', ' ' ); for ( index = 1 ; index <= 3 ; index++ ) { status = rtems_task_create( Task_name[ index ], 1, RTEMS_MINIMUM_STACK_SIZE, RTEMS_DEFAULT_MODES, RTEMS_DEFAULT_ATTRIBUTES, &Task_id[ index ] ); directive_failed( status, "rtems_task_create loop" ); } for ( index = 1 ; index <= 3 ; index++ ) { status = rtems_task_start( Task_id[ index ], Task_1_through_3, index ); directive_failed( status, "rtems_task_start loop" ); }
3个任务都运行了函数Task_1_through_3,我们可以查看Task_1_through_3函数的实现如下
rtems_task Task_1_through_3( rtems_task_argument argument ) { rtems_id rmid; rtems_id test_rmid; rtems_time_of_day time; rtems_status_code status; int start = 0; status = rtems_rate_monotonic_create( argument, &rmid ); directive_failed( status, "rtems_rate_monotonic_create" ); status = rtems_rate_monotonic_ident( argument, &test_rmid ); directive_failed( status, "rtems_rate_monotonic_ident" ); if ( rmid != test_rmid ) { printf( "RMID's DO NOT MATCH (0x%" PRIxrtems_id " and 0x%" PRIxrtems_id ")\n", rmid, test_rmid ); rtems_test_exit( 0 ); } switch ( argument ) { case 1: while ( FOREVER ) { status = rtems_rate_monotonic_period( rmid, RTEMS_MILLISECONDS_TO_TICKS(8000)); directive_failed( status, "rtems_rate_monotonic_period" ); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - executing - ", &time, "\n" ); rtems_task_wake_after(RTEMS_MILLISECONDS_TO_TICKS(2000)); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - finished - ", &time, "\n" ); if ( time.second >= 30 && time.minute == 0) { printf( "PERIODS CHECK OK 30s\n"); TEST_END(); rtems_test_exit( 0 ); } } case 2: while ( FOREVER ) { status = rtems_rate_monotonic_period( rmid, RTEMS_MILLISECONDS_TO_TICKS(6000)); directive_failed( status, "rtems_rate_monotonic_period" ); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - executing - ", &time, "\n" ); rtems_task_wake_after(RTEMS_MILLISECONDS_TO_TICKS(3000)); directive_failed( status, "rtems_task_wake_after" ); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - finished - ", &time, "\n" ); } break; case 3: while ( FOREVER ) { status = rtems_rate_monotonic_period( rmid, RTEMS_MILLISECONDS_TO_TICKS(4000)); directive_failed( status, "rtems_rate_monotonic_period" ); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - executing - ", &time, "\n" ); rtems_task_wake_after(RTEMS_MILLISECONDS_TO_TICKS(1000)); directive_failed( status, "rtems_task_wake_after" ); status = rtems_clock_get_tod( &time ); directive_failed( status, "rtems_clock_get_tod" ); put_name( Task_name[ argument ], FALSE ); print_time( " - finished - ", &time, "\n" ); } break; } }
上面的代码,我在每个任务中,按照要求定义了三个任务如下
对于任务的周期创建,这里通过标准接口rtems_rate_monotonic_create
和rtems_rate_monotonic_period
来设置。其中RTEMS_MILLISECONDS_TO_TICKS
会将ms转换成系统的tick值
对于任务的运行时间,这里通过rtems_task_wake_after
来实现,它会默认将进程让出就绪队列,然后休眠超时后,将任务加入就绪队列。
当系统任务运行结束时间超过30s时,会主动退出此测试程序
接下来我们编译运行,查看运行结果
将上述代码运行之后,可以得到如下日志
TA3 - executing - 09:00:00 04/16/2025 TA3 - finished - 09:00:01 04/16/2025 TA2 - executing - 09:00:02 04/16/2025 TA1 - executing - 09:00:04 04/16/2025 TA3 - executing - 09:00:04 04/16/2025 TA2 - finished - 09:00:05 04/16/2025 TA3 - finished - 09:00:05 04/16/2025 TA1 - finished - 09:00:06 04/16/2025 TA2 - executing - 09:00:08 04/16/2025 TA3 - executing - 09:00:08 04/16/2025 TA3 - finished - 09:00:09 04/16/2025 TA2 - finished - 09:00:11 04/16/2025 TA1 - executing - 09:00:12 04/16/2025 TA3 - executing - 09:00:12 04/16/2025 TA3 - finished - 09:00:13 04/16/2025 TA2 - executing - 09:00:14 04/16/2025 TA1 - finished - 09:00:14 04/16/2025 TA3 - executing - 09:00:16 04/16/2025 TA2 - finished - 09:00:17 04/16/2025 TA3 - finished - 09:00:17 04/16/2025 TA1 - executing - 09:00:20 04/16/2025 TA2 - executing - 09:00:20 04/16/2025 TA3 - executing - 09:00:20 04/16/2025 TA3 - finished - 09:00:21 04/16/2025 TA1 - finished - 09:00:22 04/16/2025 TA2 - finished - 09:00:23 04/16/2025 TA3 - executing - 09:00:24 04/16/2025 TA3 - finished - 09:00:25 04/16/2025 TA2 - executing - 09:00:26 04/16/2025 TA1 - executing - 09:00:28 04/16/2025 TA3 - executing - 09:00:28 04/16/2025 TA2 - finished - 09:00:29 04/16/2025 TA3 - finished - 09:00:29 04/16/2025 TA1 - finished - 09:00:30 04/16/2025 PERIODS CHECK OK 30s
其实根据日志的输出,已经很明显看出edf调度器的工作机制了,这里逐步分析一下,方便加深edf调度算法的理解
我逐步分析如下
根据上面的分析,我们可以发现
根据上面的推论,我们还可以发现
根据上面的日志,我们可能发现,任务2每次在周期内都是在第2秒才运行,也就是2/8/20/26。而任务1时间只有1s中,那么我们可以推论,此时操作系统中,还有一个任务周期为1s的任务。它的优先级在任务3和任务2之间。不过此任务不影响我们对edf任务调度的观测和推论。
最后,我们关心任务的执行此时,如下
# cat task.log | grep TA1 | wc -l 8 # cat task.log | grep TA2 | wc -l 10 # cat task.log | grep TA3 | wc -l 16
我们以30s作为任务结束为计数,可以得到如下
可以发现,完全符合edf的调度情况
根据上面的演示,我们清晰的了解了edf调度的运行机制。
本文基于linux发行版安装tensorflow2程序,用作后期的调研评估,如下是步骤
安装步骤如下
pip install -i https://mirrors.aliyun.com/pypi/simple --upgrade pip pip install -i https://mirrors.aliyun.com/pypi/simple tensorflow==2.2.0 pip install -i https://mirrors.aliyun.com/pypi/simple jupyterlab pip install -i https://mirrors.aliyun.com/pypi/simple numpy==1.20.0 pip install -i https://mirrors.aliyun.com/pypi/simple protobuf==3.20.1
值得注意的是,这里protobuf和numpy都是降级了的
为了开发tensorflow2,我们需要使用jupyter开放一个端口,默认8888,如下
{ "NotebookApp" : { "ip": "*", "port": 8888, "password": "", "open_browser": false, "token": "", "allow_root": true } }
为了运行jupyter,可以新建一个shell脚本,名字为run_jupyter.sh,内容如下
jupyter lab --config jupyter_config.json
上述动作完成之后,打开端口可以看到如下
此时我们点击Notebook下的python3进行测试
这里直接打印tensorflow2的版本即可,如下
至此,tensorflow的安装完成了,接下来继续调研tensorflow2