在bootcard中有流程是heap的初始化,为了更详细的了解内存初始化,这里以_Heap_Initialize为入口,详细了解具体做了什么
我们之前分析流程的时候,可以知道RTEMS初始化了多个堆,主要如下
_Workspace_Handler_initialization 会初始化_Heap_Initialize _Malloc_Initialize 会初始化 _Heap_Initialize add_area的cache_coherent_heap会初始化_Heap_Initialize
总共三个heap区,本文以malloc为例关注_Heap_Initialize 的初始化
uintptr_t _Heap_Initialize( Heap_Control *heap, void *heap_area_begin_ptr, // 起始地址 uintptr_t heap_area_size, // 大小 uintptr_t page_size ) { Heap_Statistics *const stats = &heap->stats; uintptr_t const heap_area_begin = (uintptr_t) heap_area_begin_ptr; uintptr_t const heap_area_end = heap_area_begin + heap_area_size; uintptr_t first_block_begin = 0; uintptr_t first_block_size = 0; uintptr_t last_block_begin = 0; uintptr_t min_block_size = 0; bool area_ok = false; Heap_Block *first_block = NULL; Heap_Block *last_block = NULL; // 如果page_size 没有设置,则设置为cpu对齐大小,否则按照设置大小向cpu对齐 if ( page_size == 0 ) { page_size = CPU_ALIGNMENT; } else { page_size = _Heap_Align_up( page_size, CPU_ALIGNMENT ); if ( page_size < CPU_ALIGNMENT ) { /* Integer overflow */ return 0; } } // 计算最小的块大小 min_block_size = _Heap_Min_block_size( page_size ); // 获取第一个块和最后一个块的地址 area_ok = _Heap_Get_first_and_last_block( heap_area_begin, heap_area_size, page_size, min_block_size, &first_block, &last_block ); if ( !area_ok ) { return 0; } memset(heap, 0, sizeof(*heap)); #ifdef HEAP_PROTECTION heap->Protection.block_initialize = _Heap_Protection_block_initialize_default; heap->Protection.block_check = _Heap_Protection_block_check_default; heap->Protection.block_error = _Heap_Protection_block_error_default; #endif // 计算第一个块的大小,拿最后一个块的开始减去第一个块的开始 first_block_begin = (uintptr_t) first_block; last_block_begin = (uintptr_t) last_block; first_block_size = last_block_begin - first_block_begin; // 设置第一个块,结合block的结构体,将prev_size设置为堆的末尾 // 将第一个块的next设置为空闲链表的尾,prev设置为空闲链表的头,这里相当于初始化空闲链表 /* First block */ first_block->prev_size = heap_area_end; first_block->size_and_flag = first_block_size | HEAP_PREV_BLOCK_USED; first_block->next = _Heap_Free_list_tail( heap ); first_block->prev = _Heap_Free_list_head( heap ); _Heap_Protection_block_initialize( heap, first_block ); // 报错堆的信息,如页大小,最小块大小,堆范围,第一个块地址,最后一个块地址 // 设置空闲链表的第一个元素是first_block /* Heap control */ heap->page_size = page_size; heap->min_block_size = min_block_size; heap->area_begin = heap_area_begin; heap->area_end = heap_area_end; heap->first_block = first_block; heap->last_block = last_block; _Heap_Free_list_head( heap )->next = first_block; _Heap_Free_list_tail( heap )->prev = first_block; // 将最后一个blokc的prev_size设置为第一个block的size /* Last block */ last_block->prev_size = first_block_size; last_block->size_and_flag = 0; _Heap_Set_last_block_size( heap ); _Heap_Protection_block_initialize( heap, last_block ); // 统计信息 总大小,空闲大小,块数 /* Statistics */ stats->size = first_block_size; stats->free_size = first_block_size; stats->min_free_size = first_block_size; stats->free_blocks = 1; stats->max_free_blocks = 1; _Heap_Protection_set_delayed_free_fraction( heap, 2 ); _HAssert( _Heap_Is_aligned( heap->page_size, CPU_ALIGNMENT ) ); _HAssert( _Heap_Is_aligned( heap->min_block_size, page_size ) ); _HAssert( _Heap_Is_aligned( _Heap_Alloc_area_of_block( first_block ), page_size ) ); _HAssert( _Heap_Is_aligned( _Heap_Alloc_area_of_block( last_block ), page_size ) ); // 返回第一个块的大小 return first_block_size; }
代码中有注释,我们可以知道,对于heap初始化来说,其主要是构造了Heap_Control *heap结构体,然后计算Heap_Block的大小来分配每个block,并为每个block来构造block信息,最后设置heap的统计信息。
根据gdb我们可以观察heap构造情况,首先查看heap control结构体,结构体定义如下:
struct Heap_Control { Heap_Block free_list; uintptr_t page_size; uintptr_t min_block_size; uintptr_t area_begin; uintptr_t area_end; Heap_Block *first_block; Heap_Block *last_block; Heap_Statistics stats; #ifdef HEAP_PROTECTION Heap_Protection Protection; #endif };
实际信息如下
(gdb) p _Malloc_Heap $1 = {free_list = {prev_size = 0, size_and_flag = 0, next = 0x0, prev = 0x0}, page_size = 0, min_block_size = 0, area_begin = 0, area_end = 0, first_block = 0x0, last_block = 0x0, stats = {lifetime_allocated = 0, lifetime_freed = 0, size = 0, free_size = 0, min_free_size = 0, free_blocks = 0, max_free_blocks = 0, used_blocks = 0, max_search = 0, searches = 0, allocs = 0, failed_allocs = 0, frees = 0, resizes = 0}}(gdb) p _Malloc_Heap $2 = {free_list = {prev_size = 0, size_and_flag = 0, next = 0x1342a0, prev = 0x1342a0}, page_size = 16, min_block_size = 32, area_begin = 1262239, area_end = 1072431104, first_block = 0x1342a0, last_block = 0x3febfff0, stats = { lifetime_allocated = 0, lifetime_freed = 0, size = 1071168848, free_size = 1071168848, min_free_size = 1071168848, free_blocks = 1, max_free_blocks = 1, used_blocks = 0, max_search = 0, searches = 0, allocs = 0, failed_allocs = 0, frees = 0, resizes = 0}}
heap block结构体如下
struct Heap_Block { uintptr_t prev_size; uintptr_t size_and_flag; Heap_Block *next; Heap_Block *prev; };
实际信息如下
(gdb) p *(Heap_Block *)0x1342a0 $5 = {prev_size = 1072431104, size_and_flag = 1071168849, next = 0x102c90 <_Malloc_Heap>, prev = 0x102c90 <_Malloc_Heap>}(gdb) p *(Heap_Block *)0x3febfff0 $6 = {prev_size = 1071168848, size_and_flag = 18446744072638382768, next = 0x0, prev = 0x0}
heap statistics结构体如下
typedef struct { /** * @brief Lifetime number of bytes allocated from this heap. * * This value is an integral multiple of the page size. */ uint64_t lifetime_allocated; /** * @brief Lifetime number of bytes freed to this heap. * * This value is an integral multiple of the page size. */ uint64_t lifetime_freed; /** * @brief Size of the allocatable area in bytes. * * This value is an integral multiple of the page size. */ uintptr_t size; /** * @brief Current free size in bytes. * * This value is an integral multiple of the page size. */ uintptr_t free_size; /** * @brief Minimum free size ever in bytes. * * This value is an integral multiple of the page size. */ uintptr_t min_free_size; /** * @brief Current number of free blocks. */ uint32_t free_blocks; /** * @brief Maximum number of free blocks ever. */ uint32_t max_free_blocks; /** * @brief Current number of used blocks. */ uint32_t used_blocks; /** * @brief Maximum number of blocks searched ever. */ uint32_t max_search; /** * @brief Total number of searches. */ uint32_t searches; /** * @brief Total number of successful allocations. */ uint32_t allocs; /** * @brief Total number of failed allocations. */ uint32_t failed_allocs; /** * @brief Total number of successful frees. */ uint32_t frees; /** * @brief Total number of successful resizes. */ uint32_t resizes; } Heap_Statistics;
实际信息在heap结构体已经打印,这里摘出来如下
(gdb) p _Malloc_Heap->stats $10 = {lifetime_allocated = 0, lifetime_freed = 0, size = 0, free_size = 0, min_free_size = 0, free_blocks = 0, max_free_blocks = 0, used_blocks = 0, max_search = 0, searches = 0, allocs = 0, failed_allocs = 0, frees = 0, resizes = 0}(gdb) p _Malloc_Heap->stats $11 = {lifetime_allocated = 0, lifetime_freed = 0, size = 1071168848, free_size = 1071168848, min_free_size = 1071168848, free_blocks = 1, max_free_blocks = 1, used_blocks = 0, max_search = 0, searches = 0, allocs = 0, failed_allocs = 0, frees = 0, resizes = 0}
至此,我们清楚的知道了RTEMS系统的堆的初始化方式。后续调试堆的时候,这些结构体对我们非常有用
rtems支持阻塞任务调度,这个名字听起来有点奇怪,按照通俗的理解,就是当任务遇到IO事件或等待事件时,可以将任务设置为阻塞状态,然后将其从就绪队列移除
对于阻塞任务调度,我们可以两种方式测试
对于上面两种程序,主要是两个函数:rtems_task_suspend/rtems_task_wake_after
为了测试阻塞任务调度,本文基于rtems_task_wake_after来测试,rtems_task_suspend方法一致,就不演示了。
测试rtems_task_wake_after,可以借鉴原有的验证程序,我们可以在Process_asr函数中,将rtems_task_wake_after( RTEMS_YIELD_PROCESSOR );设置为status = rtems_task_wake_after( rtems_clock_get_ticks_per_second() );
这里tick是100为1s。修改了0为100后,rtems_task_wake_after会主动block线程,如下代码:
rtems_status_code rtems_task_wake_after( rtems_interval ticks ) { /* * It is critical to obtain the executing thread after thread dispatching is * disabled on SMP configurations. */ Thread_Control *executing; Per_CPU_Control *cpu_self; cpu_self = _Thread_Dispatch_disable(); executing = _Per_CPU_Get_executing( cpu_self ); if ( ticks == 0 ) { _Thread_Yield( executing ); } else { _Thread_Set_state( executing, STATES_WAITING_FOR_TIME ); _Thread_Wait_flags_set( executing, THREAD_WAIT_STATE_BLOCKED ); _Thread_Add_timeout_ticks( executing, cpu_self, ticks ); } _Thread_Dispatch_direct( cpu_self ); return RTEMS_SUCCESSFUL; }
我们可以看到,tick是非0时,会调用_Thread_Set_state( executing, STATES_WAITING_FOR_TIME );函数,此函数加锁后调用_Thread_Set_state_locked,_Thread_Set_state_locked的函数实现如下
States_Control _Thread_Set_state_locked( Thread_Control *the_thread, States_Control state ) { States_Control previous_state; States_Control next_state; _Assert( state != 0 ); _Assert( _Thread_State_is_owner( the_thread ) ); previous_state = the_thread->current_state; next_state = _States_Set( state, previous_state); the_thread->current_state = next_state; if ( _States_Is_ready( previous_state ) ) { _Scheduler_Block( the_thread ); } return previous_state; }
可以发现,其先将block的state赋值给了当前状态
previous_state = the_thread->current_state; next_state = _States_Set( state, previous_state); the_thread->current_state = next_state;
然后如果任务状态是就绪的,则直接阻塞任务
_Scheduler_Block( the_thread );
_Scheduler_Block
的实现如下
static inline void _Scheduler_Block( Thread_Control *the_thread ) { Chain_Node *node; const Chain_Node *tail; Scheduler_Node *scheduler_node; const Scheduler_Control *scheduler; ISR_lock_Context lock_context; node = _Chain_First( &the_thread->Scheduler.Scheduler_nodes ); tail = _Chain_Immutable_tail( &the_thread->Scheduler.Scheduler_nodes ); scheduler_node = SCHEDULER_NODE_OF_THREAD_SCHEDULER_NODE( node ); scheduler = _Scheduler_Node_get_scheduler( scheduler_node ); _Scheduler_Acquire_critical( scheduler, &lock_context ); ( *scheduler->Operations.block )( scheduler, the_thread, scheduler_node ); _Scheduler_Release_critical( scheduler, &lock_context ); node = _Chain_Next( node ); while ( node != tail ) { scheduler_node = SCHEDULER_NODE_OF_THREAD_SCHEDULER_NODE( node ); scheduler = _Scheduler_Node_get_scheduler( scheduler_node ); _Scheduler_Acquire_critical( scheduler, &lock_context ); ( *scheduler->Operations.withdraw_node )( scheduler, the_thread, scheduler_node, THREAD_SCHEDULER_BLOCKED ); _Scheduler_Release_critical( scheduler, &lock_context ); node = _Chain_Next( node ); } }
这里可以看到,默认情况下从本调度节点上取出Scheduler_Control,然后直接调用调度器的block函数,然后遍历所有的调度节点,逐个调用调度器的withdraw_node回调。
我们可以查看withdraw的实现如下
(gdb) p *(Scheduler_Control *)0x2d248 $18 = {context = 0x100820 <_Configuration_Scheduler_priority_dflt>, Operations = { initialize = 0x1eaf0 <_Scheduler_priority_Initialize>, schedule = 0x1ef10 <_Scheduler_priority_Schedule>, yield = 0x1f130 <_Scheduler_priority_Yield>, block = 0x1ebd0 <_Scheduler_priority_Block>, unblock = 0x1efd0 <_Scheduler_priority_Unblock>, update_priority = 0x1ecf0 <_Scheduler_priority_Update_priority>, map_priority = 0x1ea30 <_Scheduler_default_Map_priority>, unmap_priority = 0x1ea40 <_Scheduler_default_Unmap_priority>, ask_for_help = 0x0, reconsider_help_request = 0x0, withdraw_node = 0x0, make_sticky = 0x22720 <_Scheduler_default_Sticky_do_nothing>, clean_sticky = 0x22720 <_Scheduler_default_Sticky_do_nothing>, pin = 0x22760 <_Scheduler_default_Pin_or_unpin_do_nothing>, unpin = 0x22760 <_Scheduler_default_Pin_or_unpin_do_nothing>, add_processor = 0x0, remove_processor = 0x0, node_initialize = 0x1eb40 <_Scheduler_priority_Node_initialize>, node_destroy = 0x1ea50 <_Scheduler_default_Node_destroy>, release_job = 0x1ea60 <_Scheduler_default_Release_job>, cancel_job = 0x1ea70 <_Scheduler_default_Cancel_job>, start_idle = 0x1ea80 <_Scheduler_default_Start_idle>, set_affinity = 0x22770 <_Scheduler_default_Set_affinity>}, maximum_priority = 255, name = 1431323680, is_non_preempt_mode_supported = true}
这里withdraw并未实现,而且scheduler_node 是根据线程来创建的,如果只有一个线程,则scheduler_node 就一个。可以查看如下定义
#define _CONFIGURE_SCHEDULER_COUNT RTEMS_ARRAY_SIZE( _Scheduler_Table ) const size_t _Scheduler_Count = _CONFIGURE_SCHEDULER_COUNT;
所以我们在_Thread_Initialize_scheduler_and_wait_nodes可以看到如下调用
do { ...... } while ( scheduler_index < _Scheduler_Count );
对于block的实现,实际上是将此任务从就绪队列删除。
根据我们了解rtems的调度场景,第一个可以知道的就是主动调度,它的实现是_Scheduler_Schedule,根据代码的上下文,我们可以知道,当系统发送信号的时候,如果线程的上下文是禁止抢占的,那么会主动schedule调度,让信号得以其他线程处理。本文通过实验的方式了解这种调度方式
为了能够测试RTEMS的主动调度,我们需要测试代码如下
rtems_task Init( rtems_task_argument argument ) { rtems_status_code status; TEST_BEGIN(); Task_name[ 1 ] = rtems_build_name( 'T', 'A', '1', ' ' ); Task_name[ 2 ] = rtems_build_name( 'T', 'A', '2', ' ' ); status = rtems_task_create( Task_name[ 1 ], 4, RTEMS_MINIMUM_STACK_SIZE * 2, RTEMS_DEFAULT_MODES, RTEMS_DEFAULT_ATTRIBUTES, &Task_id[ 1 ] ); directive_failed( status, "rtems_task_create of TA1" ); status = rtems_task_create( Task_name[ 2 ], 4, RTEMS_MINIMUM_STACK_SIZE, RTEMS_DEFAULT_MODES, RTEMS_DEFAULT_ATTRIBUTES, &Task_id[ 2 ] ); directive_failed( status, "rtems_task_create of TA2" ); status = rtems_task_start( Task_id[ 1 ], Task_1, 0 ); directive_failed( status, "rtems_task_start of TA1" ); status = rtems_task_start( Task_id[ 2 ], Task_2, 0 ); directive_failed( status, "rtems_task_start of TA2" ); Timer_name[ 1 ] = rtems_build_name( 'T', 'M', '1', ' ' ); status = rtems_timer_create( Timer_name[ 1 ], &Timer_id[ 1 ] ); directive_failed( status, "rtems_timer_create of TM1" ); rtems_task_exit(); }
为了使得task运行,我们需要实现Task_1函数
rtems_task Task_1( rtems_task_argument argument ) { rtems_mode previous_mode; rtems_status_code status; puts( "TA1 - rtems_signal_catch - RTEMS_INTERRUPT_LEVEL( 0 )" ); status = rtems_signal_catch( Process_asr, RTEMS_INTERRUPT_LEVEL(0) ); directive_failed( status, "rtems_signal_catch" ); status = rtems_task_mode( RTEMS_NO_PREEMPT , RTEMS_PREEMPT_MASK, &previous_mode ); printf("Kylin: previous_mode=%#x \n", previous_mode); puts( "TA1 - rtems_signal_send - RTEMS_SIGNAL_16 to self" ); status = rtems_signal_send( RTEMS_SELF, RTEMS_SIGNAL_16 ); directive_failed( status, "rtems_signal_send" ); puts( "TA1 - rtems_task_exit" ); rtems_task_exit(); }
根据上面代码示例,我们通过rtems_signal_catch捕捉了所有的中断源,为了让测试能够正常运行,我们通过rtems_task_mode将此task设置为不可抢占。
代码分析从两个部分,一个是rtems_signal_catch,我们关注rtems_mode mode_set形参,可以留意如下
asr->mode_set = mode_set;
另一部分是rtems_signal_send函数,它调用关系如下:
rtems_signal_send--->_Thread_Add_post_switch_action--->action->handler = handler;
rtems_signal_send--->_Thread_Dispatch_enable--->_Thread_Do_dispatch--->_Thread_Run_post_switch_actions---> ( *action->handler )( executing, action, &lock_context );
这里的handler回调就是_Signal_Action_handler。此函数的主要作用为调用ASR handler,也就是注册的回调函数。指的注意的是,在调用信号处理函数之前,有如下逻辑
normal_is_preemptible = executing->is_preemptible; executing->is_preemptible = _Modes_Is_preempt( asr->mode_set ); if ( executing->is_preemptible && !normal_is_preemptible ) { Per_CPU_Control *cpu_self; cpu_self = _Thread_Dispatch_disable_critical( lock_context ); _Scheduler_Schedule( executing ); _Thread_State_release( executing, lock_context ); _Thread_Dispatch_direct( cpu_self ); } else { _Thread_State_release( executing, lock_context ); } ( *asr->handler )( signal_set );
分析此代码可以知道,如果当前线程状态有asr设置了可抢占,但是初始化此线程时,声明是不可抢占的,满足这两个前提下,默认主动调用_Scheduler_Schedule,让出cpu。让高优先级的程序先运行一次。
通俗点的意思就是,当任务有可抢占设置为不可抢占时,在信号处理函数中,默认先调用_Scheduler_Schedule,然后由_Thread_Dispatch_direct执行当前最高优先级的任务,等最高优先级任务完成之后。再调用ASR的handler回调。因为ASR的handler的函数内是不可抢占的。
这里的asr handler函数为Process_asr,也就是使得Process_asr作为不可抢占的任务运行时,先将系统最高优先级任务运行一次。
(gdb) bt #0 _Scheduler_priority_Schedule (scheduler=0x2d268 <_Scheduler_Table>, the_thread=0x1059e0 <_RTEMS_tasks_Objects+1512>) at ../../../cpukit/include/rtems/score/schedulerimpl.h:108 #1 0x000000000001d5c0 in _Scheduler_Schedule (the_thread=0x1059e0 <_RTEMS_tasks_Objects+1512>) at ../../../cpukit/include/rtems/score/schedulerimpl.h:227 #2 _Signal_Action_handler (executing=0x1059e0 <_RTEMS_tasks_Objects+1512>, action=0x105ee8 <_RTEMS_tasks_Objects+2800>, lock_context=0x10eca0) at ../../../cpukit/rtems/src/signalsend.c:103 #3 0x000000000001f5f0 in _Thread_Run_post_switch_actions (executing=0x1059e0 <_RTEMS_tasks_Objects+1512>) at ../../../cpukit/score/src/threaddispatch.c:270 #4 _Thread_Do_dispatch (cpu_self=0x102080 <_Per_CPU_Information>, level=832) at ../../../cpukit/score/src/threaddispatch.c:356 #5 0x000000000001f870 in _Thread_Dispatch_enable (cpu_self=cpu_self@entry=0x102080 <_Per_CPU_Information>) at ../../../cpukit/score/src/threaddispatch.c:389 #6 0x000000000001d6dc in rtems_signal_send (id=id@entry=0, signal_set=signal_set@entry=65536) at ../../../cpukit/rtems/src/signalsend.c:205 #7 0x0000000000019728 in Task_1 (argument=<optimized out>) at ../../../testsuites/sptests/sp14/task1.c:71 #8 0x000000000001f9f0 in _Thread_Handler () at ../../../cpukit/score/src/threadhandler.c:164 #9 0x000000000001f900 in ?? ()
根据堆栈,可以知道在rtems_signal_send函数中调用_Thread_Dispatch_enable,然后根据_Thread_Do_dispatch 函数触发了_Signal_Action_handler 函数回调。在此函数回调中运行asr handler之前,默认主动调度一次。使得系统在任务从可抢占变为不可抢占前,利用最后抢占窗口重新调度。
根据上面分析,我们可以知道,我们让asr handler之前尽可能的调度一次,同样的,如果我们在信号catch函数设置asr->mode为不可抢占,那么在_Signal_Action_handler 中的线程状态就是不可抢占的,那么此次信号就不会主动发送这次调度事件。如下
rtems_task Task_1( rtems_task_argument argument ) { rtems_mode previous_mode; rtems_status_code status; puts( "TA1 - rtems_signal_catch - RTEMS_INTERRUPT_LEVEL( 0 )" ); status = rtems_signal_catch( Process_asr, RTEMS_INTERRUPT_LEVEL(0) | RTEMS_NO_PREEMPT); directive_failed( status, "rtems_signal_catch" ); status = rtems_task_mode( RTEMS_NO_PREEMPT , RTEMS_PREEMPT_MASK, &previous_mode ); printf("Kylin: previous_mode=%#x \n", previous_mode); puts( "TA1 - rtems_signal_send - RTEMS_SIGNAL_16 to self" ); status = rtems_signal_send( RTEMS_SELF, RTEMS_SIGNAL_16 ); directive_failed( status, "rtems_signal_send" ); puts( "TA1 - rtems_task_exit" ); rtems_task_exit(); }
可以发现,我们在rtesm_signal_catch中,设置了asr→mode为不可抢占。则此次任务就会跳过主动调度。
至此,我们通过信号来测试了rtems的主动调度函数
优先级调度算法顾名思义就是基于任务的优先级来进行调度,任何高优先级的任务都应该得到调度,本文介绍rtems的默认调度器算法优先级调度算法
优先级调度算法默认是通过位图的方式实现O(1)的调度,其结构如下
typedef struct { /** * @brief Basic scheduler context. */ Scheduler_Context Base; /** * @brief Bit map to indicate non-empty ready queues. */ Priority_bit_map_Control Bit_map; /** * @brief One ready queue per priority level. */ Chain_Control Ready[ RTEMS_ZERO_LENGTH_ARRAY ]; } Scheduler_priority_Context;
这里面bitmap的结构体实现如下
typedef struct { /** * @brief Each sixteen bit entry in this word is associated with one of the * sixteen entries in the bit map. */ Priority_bit_map_Word major_bit_map; /** * @brief Each bit in the bit map indicates whether or not there are threads * ready at a particular priority. * * The mapping of individual priority levels to particular bits is processor * dependent as is the value of each bit used to indicate that threads are * ready at that priority. */ Priority_bit_map_Word bit_map[ 16 ]; } Priority_bit_map_Control;
根据上面两个数据结构,可以知道如下:
调度器的上下文,通过Base存储,如Lock和Processor_mask 位图来控制优先级,主优先级由major_bit_map来设置,次优先级通过bit_map来设置 位图的快速查找O(1)复杂度得益于aarch64的clz指令 根据位图计算的优先级值,每个值对应一个Ready的链表 Ready存放的就是最大256个优先级的链表 由数组和链表组成的拉链能够实现快速定位最高优先级任务
基于优先级的调度算法的快速查找优先级的核心是clz指令,该指令从最低位遇到1之前的计算0的个数。如下
rtems使用了gnu的内置函数版本,代码如下:
static inline unsigned int _Bitfield_Find_first_bit( unsigned int value ) { unsigned int bit_number; #if ( CPU_USE_GENERIC_BITFIELD_CODE == FALSE ) _CPU_Bitfield_Find_first_bit( value, bit_number ); #elif defined(__GNUC__) bit_number = (unsigned int) __builtin_clz( value ) - __SIZEOF_INT__ * __CHAR_BIT__ + 16; #else if ( value < 0x100 ) { bit_number = _Bitfield_Leading_zeros[ value ] + 8; } else { \ bit_number = _Bitfield_Leading_zeros[ value >> 8 ]; } #endif return bit_number; }
_Bitfield_Find_first_bit
返回遇到1时0的个数
对于位图的设置和更新,由于位图的特性,需要设置位图的major,然后根据major设置位图的minor。其代码如下
static inline void _Priority_bit_map_Add ( Priority_bit_map_Control *bit_map, Priority_bit_map_Information *bit_map_info ) { *bit_map_info->minor |= bit_map_info->ready_minor; bit_map->major_bit_map |= bit_map_info->ready_major; }
比较清晰的是major_bit_map的设置,它是直接赋值操作,而位图的minor应该由major的值来选择数组,所以需要在初始化中如下设置
函数_Priority_bit_map_Initialize_information
bit_map_info->minor = &bit_map->bit_map[ _Priority_Bits_index( major ) ];
这里可以看到,minor是取得bit_map→bit_map[x]的地址。
同样的,对于bitmap的remove,则很容易理解了,如下
static inline void _Priority_bit_map_Remove ( Priority_bit_map_Control *bit_map, Priority_bit_map_Information *bit_map_info ) { *bit_map_info->minor &= bit_map_info->block_minor; if ( *bit_map_info->minor == 0 ) bit_map->major_bit_map &= bit_map_info->block_major; }
需要注意的是,这里通过位与来清楚minor和major的值,那么有一个细节如下可以留意
mask = _Priority_Mask( major ); bit_map_info->ready_major = mask; bit_map_info->block_major = (Priority_bit_map_Word) ~mask; mask = _Priority_Mask( minor ); bit_map_info->ready_minor = mask; bit_map_info->block_minor = (Priority_bit_map_Word) ~mask;
这里我们位与的值实际上取反过了。所以位与可以达到清零的作用。
将数组和链表结合起来的拉链,其实现效果如下
这里每个优先级对应一个数组,然后每个数组都是一个链表,链表通过队列的方式实现。也就是如下:
Chain_Control Ready[ RTEMS_ZERO_LENGTH_ARRAY ];
管理基于优先级调度的方式是使用队列的方式,所有的插入操作都是尾插,所有的查询操作都是从头找。而链表的删除操作本身就是O(1)
则可以发现,队列实现的优先级链表的整体效率就是O(1)。代码如下
// 查找 static inline Chain_Node *_Chain_First( const Chain_Control *the_chain ) { return _Chain_Immutable_head( the_chain )->next; } // 插入 static inline void _Chain_Append_unprotected( Chain_Control *the_chain, Chain_Node *the_node ) { Chain_Node *tail; Chain_Node *old_last; _Assert( _Chain_Is_node_off_chain( the_node ) ); tail = _Chain_Tail( the_chain ); old_last = tail->previous; the_node->next = tail; tail->previous = the_node; old_last->next = the_node; the_node->previous = old_last; } // 删除 static inline void _Chain_Extract_unprotected( Chain_Node *the_node ) { Chain_Node *next; Chain_Node *previous; _Assert( !_Chain_Is_node_off_chain( the_node ) ); next = the_node->next; previous = the_node->previous; next->previous = previous; previous->next = next; #if defined(RTEMS_DEBUG) _Chain_Set_off_chain( the_node ); #endif }
根据上面的分析,初始化调度器就是将bitmap和数组以及链表申请并清空即可。如下
void _Scheduler_priority_Initialize( const Scheduler_Control *scheduler ) { Scheduler_priority_Context *context = _Scheduler_priority_Get_context( scheduler ); _Priority_bit_map_Initialize( &context->Bit_map ); _Scheduler_priority_Ready_queue_initialize( &context->Ready[ 0 ], scheduler->maximum_priority ); }
_Priority_bit_map_Initialize
将bit_map结构之间memset即可
_Scheduler_priority_Ready_queue_initialize
遍历所有的Ready数组,对每个数组进行链表初始化,因为是单链表,所以如下
static inline void _Chain_Initialize_empty( Chain_Control *the_chain ) { Chain_Node *head; Chain_Node *tail; _Assert( the_chain != NULL ); head = _Chain_Head( the_chain ); tail = _Chain_Tail( the_chain ); head->next = tail; head->previous = NULL; tail->previous = head; }
对于调度器的任务就绪,就是调用链表的插入,然后更新优先级的bitmap。那么对应函数如下
_Scheduler_priority_Ready_queue_update _Scheduler_priority_Ready_queue_enqueue
update的作用是初始化或更新bitmap,而enqueue的作用是将ready的链表节点移除,并且通过或操作设置bitmap。如下
static inline void _Scheduler_priority_Ready_queue_update( Scheduler_priority_Ready_queue *ready_queue, unsigned int new_priority, Priority_bit_map_Control *bit_map, Chain_Control *ready_queues ) { ready_queue->current_priority = new_priority; ready_queue->ready_chain = &ready_queues[ new_priority ]; _Priority_bit_map_Initialize_information( bit_map, &ready_queue->Priority_map, new_priority ); } static inline void _Scheduler_priority_Ready_queue_enqueue( Chain_Node *node, Scheduler_priority_Ready_queue *ready_queue, Priority_bit_map_Control *bit_map ) { Chain_Control *ready_chain = ready_queue->ready_chain; _Chain_Append_unprotected( ready_chain, node ); _Priority_bit_map_Add( bit_map, &ready_queue->Priority_map ); }
任务阻塞和就绪是相反的作用,先通过_Scheduler_priority_Ready_queue_extract将就绪队列的任务删除,如果就绪队列只有一个元素,则更新bitmap,清空bitmap设置,如下
static inline void _Scheduler_priority_Ready_queue_extract( Chain_Node *node, Scheduler_priority_Ready_queue *ready_queue, Priority_bit_map_Control *bit_map ) { Chain_Control *ready_chain = ready_queue->ready_chain; if ( _Chain_Has_only_one_node( ready_chain ) ) { _Chain_Initialize_empty( ready_chain ); _Chain_Initialize_node( node ); _Priority_bit_map_Remove( bit_map, &ready_queue->Priority_map ); } else { _Chain_Extract_unprotected( node ); } }
任务调度的操作就是从ready队列取出最高优先级任务出来,然后标记执行,对应函数为_Scheduler_priority_Get_highest_ready
static inline Thread_Control *_Scheduler_priority_Get_highest_ready( const Scheduler_Control *scheduler ) { Scheduler_priority_Context *context = _Scheduler_priority_Get_context( scheduler ); return (Thread_Control *) _Scheduler_priority_Ready_queue_first( &context->Bit_map, &context->Ready[ 0 ] ); }
_Scheduler_priority_Ready_queue_first的作用就是在队列中取出head头节点。
优先级更新的操作有如下四步
先将原任务从就绪队列取出 根据新的任务设置就绪队列和bitmap 如果bitmap存在,则追加的方式将此任务入队 如果bitmap不存在,则初始化就绪队列 代码实现如下
_Scheduler_priority_Ready_queue_extract( &the_thread->Object.Node, &the_node->Ready_queue, &context->Bit_map ); _Scheduler_priority_Ready_queue_update( &the_node->Ready_queue, unmapped_priority, &context->Bit_map, &context->Ready[ 0 ] ); if ( SCHEDULER_PRIORITY_IS_APPEND( new_priority ) ) { _Scheduler_priority_Ready_queue_enqueue( &the_thread->Object.Node, &the_node->Ready_queue, &context->Bit_map ); } else { _Scheduler_priority_Ready_queue_enqueue_first( &the_thread->Object.Node, &the_node->Ready_queue, &context->Bit_map ); }
十一、总结 至此,我们可以知道优先级调度算法是基于bitmap和队列实现的一种调度算法,此调度算法通过计算任务优先级来调整任务。整体效率是O(1)。
对于该调度算法而言,其是可抢占性的,并且任务是具备队列的FIFO特性的。
根据之前的理解,rtems可以在一定条件下主动调度,同样的,也可以在某些条件下调用yield,让出调度。这种让出和主动调度不一样的点在于,让出调度是将当前任务挂起一段时间(以系统时钟滴答 ticks 为单位),使其进入延迟状态,并在指定时间后自动唤醒并恢复就绪状态。
让出调度通过函数_Scheduler_Yield实现,其实先基于_Scheduler_Schedule,但与其不同的是,schedule不会修改就绪队列,而yield会将自己任务从就绪队列删除, 然后从tail再次加入。这样每次调用yield的情况下,自身任务永远在队尾
让出调度可以在多个场景下测试,只要能够体现让出调度相比于主动调度情况下,让出调度会将自己就绪队列拿出并重新插入。本文还是基于信号的测试来验证让出调度。其函数为
rtems_status_code rtems_task_wake_after( rtems_interval ticks ) { /* * It is critical to obtain the executing thread after thread dispatching is * disabled on SMP configurations. */ Thread_Control *executing; Per_CPU_Control *cpu_self; cpu_self = _Thread_Dispatch_disable(); executing = _Per_CPU_Get_executing( cpu_self ); if ( ticks == 0 ) { _Thread_Yield( executing ); } else { _Thread_Set_state( executing, STATES_WAITING_FOR_TIME ); _Thread_Wait_flags_set( executing, THREAD_WAIT_STATE_BLOCKED ); _Thread_Add_timeout_ticks( executing, cpu_self, ticks ); } _Thread_Dispatch_direct( cpu_self ); return RTEMS_SUCCESSFUL; }
可以看到,如果ticks不为0,及时等待,如为0,则开始让出。根据上面的代码展示,其测试代码如下
rtems_asr Process_asr( rtems_signal_set the_signal_set ) { rtems_status_code status; printf( "ASR - ENTRY - signal => %08" PRIxrtems_signal_set "\n", the_signal_set ); switch( the_signal_set ) { case RTEMS_SIGNAL_16: case RTEMS_SIGNAL_17: case RTEMS_SIGNAL_18 | RTEMS_SIGNAL_19: break; case RTEMS_SIGNAL_0: case RTEMS_SIGNAL_1: puts( "ASR - rtems_task_wake_after - yield processor" ); status = rtems_task_wake_after( RTEMS_YIELD_PROCESSOR ); directive_failed( status, "rtems_task_wake_after yield" ); break; case RTEMS_SIGNAL_3: Asr_fired = TRUE; break; } printf( "ASR - EXIT - signal => %08" PRIxrtems_signal_set "\n", the_signal_set ); }
此代码需要结合RTEMS调度器-主动调度的测试代码,此文章仅提供了信号回调函数Process_asr的实现
可以发现,对于0,1信号,默认会让出cpu,从而使得其他task正常运行。
为了体现两个task之间的相互让出,测试代码可以在task2上发送信号并主动让出,如下
rtems_task Task_2( rtems_task_argument argument ) { rtems_status_code status; puts( "TA2 - rtems_signal_send - RTEMS_SIGNAL_17 to TA1" ); status = rtems_signal_send( Task_id[ 1 ], RTEMS_SIGNAL_17 ); directive_failed( status, "rtems_signal_send" ); puts( "TA2 - rtems_task_wake_after - yield processor" ); status = rtems_task_wake_after( RTEMS_YIELD_PROCESSOR ); directive_failed( status, "rtems_task_wake_after" ); puts("TA2 - rtems_signal_send - RTEMS_SIGNAL_18 and RTEMS_SIGNAL_19 to TA1"); status = rtems_signal_send( Task_id[ 1 ], RTEMS_SIGNAL_18 | RTEMS_SIGNAL_19 ); directive_failed( status, "rtems_signal_send" ); TEST_END(); rtems_test_exit( 0 ); }
可以发现,task2会发送signal 17,然后主动让出后再发送信号18和19.我们查看运行后的日志
TA1 - rtems_signal_catch - RTEMS_INTERRUPT_LEVEL( 0 ) TA1 - rtems_signal_send - RTEMS_SIGNAL_16 to self ASR - ENTRY - signal => 00010000 ASR - EXIT - signal => 00010000 TA1 - rtems_signal_send - RTEMS_SIGNAL_0 to self ASR - ENTRY - signal => 00000001 ASR - rtems_task_wake_after - yield processor TA2 - rtems_signal_send - RTEMS_SIGNAL_17 to TA1 TA2 - rtems_task_wake_after - yield processor ASR - ENTRY - signal => 00020000 ASR - EXIT - signal => 00020000 ASR - EXIT - signal => 00000001
上面日志可以发现,TA1默认发送RTEMS_SIGNAL_0 后,主动让出了,然后此时TA2任务得到执行,如上发送信号RTEMS_SIGNAL_17 to TA1,然后TA2主动让出,此时Process_asr的顺序是
先发送RTEMS_SIGNAL_1,在RTEMS_SIGNAL_0中,主动让出了,所以有日志 ENTRY - signal => 00000001 。等RTEMS_SIGNAL_17 发送完成后 asr handler得到日志ASR - ENTRY - signal => 00020000 和ASR - EXIT - signal => 00020000 。最后因为让出调度会插入调度队列的tail,所以最后打印
ASR - EXIT - signal => 00000001
至此,根据上面的测试代码和测试日志,可以非常清楚的了解了主动调度的逻辑。主动调度会放弃cpu,并将自己插入就绪队列的尾部。