Главная
Вычислительные ресурсы
C чего начать
Вопросы - ответы
Документация
Исследования
Контакты

Начнем с измерения быстродействия того, что у нас получилось по умолчанию, безо всяких специальных усилий по оптимизации. Гибридно - параллельная вычислительная установка, на которой будем проводить исследования, состоит из единственного вычислительного узла.

Состав узла:

Из приведенной здесь комплектации вычислительного узла следует, что на процессоре могут выполняться параллельно, давая скоростной выигрыш, не более двух процессов вычислительного характера.

Из состава сопроцессорного блока имеет смысл использовать одновременно, при решении одной задачи, не более двух, максимум - четырех ПЛИС, поскольку максимальная скорость передачи данных в один сопроцессор (одну ПЛИС) - 800 мегабайт в секунду, а максимальная теоретически возможная суммарная скорость передачи данных в весь сопроцессорный блок - около 2000 мегабайт в секунду.

В качестве теста быстродействия, будем выполнять 100 итераций нашего модельного приложения на сетке размером 4000х4000.

Для начала сравним между собой разные варианты программной реализации.

Наиболее простой вариант описан в Разделе 3 настоящего документа. В нем не используется искусственное расширение теневой грани, а итерации выполняются методом двух массивов (то же самое получится в варианте из Раздела 4, если положить коэффициент расширения теневой грани равным единице). Времена исполнения теста для этого варианта программы таковы:

Вариант из Раздела 5, с коэффициент расширения теневой грани, равным единице, отличается тем, что итерации выполняются методом буферизации строк. Для этого варианта времена таковы:

Этот вариант - самый эффективный из программных.

Прежде, чем приступать к измерениям программно - аппаратных вариантов, исследуем программный вариант мелкоблочной обработки, описанный в Разделе 7 настоящего документа. Следующие значения констант, характеризующих разбиение, позволяет нам использовать практически всю память одной ПЛИС, и задавать коэффициент расширения теневой грани в диапазоне от 1 до 4 включительно, поскольку (12+2*4)*5000=100000:

  PORTION    12
  MAXMY     5000
  LOCALMEM  100000
(на самом деле, значение PORTION для нашей задачи можно было бы задать равным 17, поскольку реальный размер строки у нас 4000, а (17+2*4)*4000=100000, но сейчас это не так важно - нам пока требуется первоначальная прикидка).

Для этого варианта времена таковы:

При коэффициенте расширения теневой грани, равном единице:

При коэффициенте расширения теневой грани, равном двум:

Видим, что при программной реализации расширение теневой грани никакой пользы не дает, но и вред от нее незначителен. Сам же факт мелкоблочного разбиения заметно замедлил программу: 9.1с вместо 6.6с, почти в 1.4 раза.

Теперь проведем хронометраж программно - аппаратного варианта, в котором функция обработки мелкого блока аппаратно реализована в ПЛИС. Никаких усилий по "объяснению" компилятору из C в VHDL, как именно лучше строить схему, предпринимать не будем. Времена получатся такие:

При коэффициенте расширения теневой грани, равном единице:

При коэффициенте расширения теневой грани, равном двум:

Преодолев понятные в данном случае ужас и отчаяние, попробуем понять, что же именно мы получили. Само по себе замедление обработки на одной ПЛИС по сравнению с одним процессором примерно в 85 раз, конечно, огорчает, но удивлять не должно. С учетом всего сказанного в предыдущем разделе настоящего документа, чего-то такого и следовало ожидать. Поразительно другое. Расширение теневой грани в два раза привело не к ускорению, а к заметному замедлению обработки. Значит, лимитирующим фактором является не передача данных между процессором и сопроцессором, а именно сами вычисления внутри сопроцессора! Расширяя искусственно теневую грань, мы, действительно, добавляем сопроцессору некоторую лишнюю работу, в надежде на то, что ему гораздо легче обрабатывать данные внутри себя, чем передавать их, и потери от незначительного увеличения работы будут с лихвой перекрыты выигрышем от уменьшения объема передачи. В данном случае очевидно, что надежда не сбылась. Это означает, что качество схемы, выполняющей обработку, действительно, ОЧЕНЬ низкое, что вселяет осторожный оптимизм: плохое, как правило, можно улучшить. Чтобы окончательно убедиться в том, что измеренное нами гигантское время уходит именно на работу, а не на передачу данных, повторим последнюю серию измерений, исключив из программы все обращения к logic_put() и logic_get(). Получим то, что и ожидали:

При коэффициенте расширения теневой грани, равном единице:

При коэффициенте расширения теневой грани, равном двум:

Приведенные здесь значения - это время чистой обработки данных в сопроцессоре, плюс время запуска сопроцессора и его синхронизации с управляющей программой. Вычитая эти времена из приведенных выше, получим времена передачи данных между процессором и сопроцессором. Для случая одной ПЛИС, время передачи данных составляет, 24.4с для узкой теневой грани, и 13.3с для широкой. Расширение теневой грани вдвое, действительно, снизило затраты на передачу данных почти вдвое, но нас сейчас лимитирует не передача данных, а противоестественно медленная обработка.

Итак, наша задача - на два порядка ускорить внутреннюю обработку данных в сопроцессоре. Прежде всего, поинтересуемся, какой процент вычислительного оборудования в составе ПЛИС наша схема реально использует (о том, что и как смотреть, написано в Руководстве пользователя). Проанализировав, как рекомендует Руководство пользователя, содержимое файла utilization.rpt, убедимся, что наша схема использует 70% памяти, 2% универсальной логики, и 1.6% блоков DSP, то есть кристалл практически пуст. Объем используемой памяти следует увеличить - это позволит нам улучшить отношение между полезной и "лишней" работой внутри мелкого блока. Затем все усилия следует сосредоточить на "убеждении" транслятора из C в VHDL в необходимости использовать побольше вычислительных ресурсов ПЛИС для ускорения обработки. Из полученных нами значений быстродействия вполне очевидно, что транслятор сделал именно то, чего мы опасались: изготовил схему, выполняющую вычисления настолько последовательно, насколько это мог бы делать процессор общего назначения, оснащенный единственным арифметическим устройством.

Увеличиваем LOCALMEM до 120000, задаем STW равным 4, тогда PORTION можно сделать равным 22, поскольку (22+2*4)*4000 = 120000.

Теперь нам предстоит сопоставить две вещи: наши пожелания по построению схемы, и имеющиеся в нашем распоряжении прагмы.

Пожеланий у нас - всего три:
—  реализовать вычисление нового значения температуры в точке по старым значениям в соседних точках в конвейерном режиме. Это позволит нам получать на каждом такте рабочей частоты по одному полностью готовому новому значению, невзирая на сложность формулы (которая, увы, совсем невелика в нашем случае);
—  по возможности расширить конвейер до такого количества независимо работающих "ниток", какое уместится в кристалле. Это позволит нам получать на каждом такте рабочей частоты не по одному полностью готовому новому значению, а по столько полностью готовых новых значений, сколько "ниток" будет у нашего конвейера;
—  расслоить память используемых в программе массивов на столько слоев, сколько у конвейера "ниток", чтобы "нитки" конвейера действительно могли одновременно выбирать из памяти данные и записывать в нее результаты на каждом такте, не мешая друг другу.

Попробуем, кстати, грубо прикинуть число "ниток" конвейера, необходимое для "обгона" нашего процессора. В нашем тестовом приложении мы вычисляем на каждой итерации примерно 16 млн. новых значений, за 100 итераций это 1.6 миллиарда. Наша рабочая частота - 100 млн. тактов в секунду. Чтобы вычислить 1.6 миллиарда значений при такой частоте, например, за 1 секунду, нам надо получать на каждом такте по 16 новых значений. Это возможно, если нам удастся реализовать все наши пожелания, высказанные выше, при количестве "ниток" конвейера, равным 16. В действительности быстродействие будет, даже без учета затрат времени на передачу данных между процессором и сопроцессором, несколько ниже, поскольку мы в нашей прикидке молчаливо предполагали, что конвейер никогда не разгоняется и не тормозит, чего, очевидно, на практике быть не может.

Прагмы в нашем распоряжении имеются следующие:

#pragma HLS pipeline

Будучи указана сразу после открывающей фигурной скобки цикла, предписывает строить для тела этого цикла схему, работающую в конвейерном режиме.

#pragma HLS unroll factor=<N>

Будучи указана сразу после открывающей фигурной скобки цикла, предписывает строить для тела этого цикла схему, выполняющую витки цикла порциями по <N> штук одновременно.

#pragma HLS array_partition variable=<arr> cyclic factor=<N> dim=<D>

Будучи указана сразу после объявления массива <arr>, предписывает циклически расслоить его память на <N> слоев по измерению номер <D>.

Приведенные прагмы в точности соответствуют нашим пожеланиям. В действительности, прагм, реализованных в рассматриваемом компиляторе, гораздо больше, и мы при необходимости будем использовать некоторые другие прагмы в дальнейшем изложении, объясняя по ходу их смысл.

Попробуем вставить эти прагмы в наш исходный текст. Строго говоря, он не совсем "наш": по сравнению с тем вариантом itstep_accel(), на котором мы остановились выше, в приведенный ниже текст внесены два изменения. Мы снова сделали массивы f и r двумерными, и добавили к массивам lower и middle еще один промежуточный массив upper, чтобы избавиться в критическом цикле от чтения массива f. В остальном текст не изменился.

Начнем с конвейеризации:

#include "dimension.h"
#define MAXMY 5000
#define LOCALMEM 120000
#ifdef FOR_CPU
#include <stdio.h>
#include "comm_cpu.c"
    int itstep_accel_test( int my_number, int n_of_nodes, int mx, int my, 
                                                int stw, int portion )
{
    if ( ((portion+2*stw)*my) > LOCALMEM )
     {
      fprintf( stderr, 
 "((portion(%d)+2*stw(%d))*my(%d))>localmem(%d), no memory for arrays in accel\n", 
                             portion, stw, my, LOCALMEM );
      return 1;
     }
    if ( my > MAXMY )
     {
      fprintf( stderr, " my (%d) greater than maxmy (%d)\n", my, MAXMY );
      return 1; 
     }
    else return 0;
}
#endif

/*--vector_proc_64
void itstep_accel(
double pf,      : ram0,inout, wblock=1,size=120000
double pr,      : ram1,in, wblock=1,size=120000
long mx,       : reg0
long my,      : reg1
double rdx2,      : reg2
double rdy2,      : reg3
double beta,      : reg4
long niter      : reg5
)--*/

  void itstep_accel( long mx, long my, 
                                        double pf[LOCALMEM], double pr[LOCALMEM], 
                                        double rdx2, double rdy2, double beta, long niter )
{
    DIM2( double, f, my ) = (typeof(f))pf;
    DIM2( double, r, my ) = (typeof(r))pr;
    static double  lower[MAXMY];
    static double middle[MAXMY];
    static double  upper[MAXMY];
    int i, j, k, mx1, my1;
/***/
    mx1 = mx - 1;
    my1 = my - 1;
    for ( k = 0; k < niter; k++ )
     {
      for ( j = 0; j < my; j++ )
       {
#pragma HLS pipeline
        upper[j] = f[0][j];
       }
      for ( i = 0; i < mx1; i++ )
       {
        for ( j = 0; j < my; j++ )
         {
#pragma HLS pipeline
          lower[j] = middle[j];
          middle[j] = upper[j];
          upper[j] = f[i+1][j];
         } 
        if ( i )
         {
     for ( j = 1; j < my1; j++ )
      {
#pragma HLS pipeline
                f[i][j] = ((lower[j]+upper[j])*rdx2+(middle[j-1]+middle[j+1])*rdy2
                                       -r[i][j])*beta;
               }
           }
       }
     }
}

Судя по следующему фрагменту файла vivado_hls.log, транслятор построил конвейерную схему:

@I [HLS-10] -- Scheduling module 'itstep_accel' 
@I [HLS-10] ----------------------------------------------------------------
@I [SCHED-11] Starting scheduling ...
@I [SCHED-61] Pipelining loop 'Loop 1.1'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 2.
@I [SCHED-61] Pipelining loop 'Loop 1.2.1'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 2.
@I [SCHED-61] Pipelining loop 'Loop 1.2.2'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 28.
@I [SCHED-11] Finished scheduling.

Обозначение "II" во фразах "Target II" в этом тексте означает "Initialization Interval", то есть интервал между подачей очередных порций данных на конвейер. Значение II, равное 1, означает, что данные подаются на каждом такте, то есть конвейер построен. К сожалению, существенного ускорения эта схема по сравнению с первоначальной не обнаруживает, поскольку у построенного в ней конвейера всего одна "нитка".

ЗАМЕЧАНИЕ. При интенсивном экспериментировании с прагмами, обычно хочется побыстрее получить протокол синтеза в файле vivado_hls.log, по которому зачастую все ясно и так, без трассировки и реального запуска сопроцессора. Для получения файла vivado_hls.log достаточно выполнить команду hls (см. Руководство пользователя), которая, в отличие от полноценного синтеза, срабатывает обычно за несколько секунд. Конец замечания.

Теперь попробуем развернуть циклы и расслоить память:

#include "dimension.h"
#define MAXMY 5000
#define LOCALMEM 120000
#ifdef FOR_CPU
#include <stdio.h>
#include "comm_cpu.c"
    int itstep_accel_test( int my_number, int n_of_nodes, int mx, int my, 
                                                int stw, int portion )
{
    if ( ((portion+2*stw)*my) > LOCALMEM )
     {
      fprintf( stderr, 
"((portion(%d)+2*stw(%d))*my(%d))>localmem(%d), no memory for arrays in accel\n", 
                            portion, stw, my, LOCALMEM );
      return 1;
     }
    if ( my > MAXMY )
     {
      fprintf( stderr, " my (%d) greater than maxmy (%d)\n", my, MAXMY );
      return 1; 
     }
    else return 0;
}
#endif

/*--vector_proc_64
void itstep_accel(
double pf,      : ram0,inout, wblock=8,size=120000
double pr,      : ram1,in, wblock=8,size=120000
long mx,       : reg0
long my,      : reg1
double rdx2,      : reg2
double rdy2,      : reg3
double beta,      : reg4
long niter      : reg5
)--*/

    void itstep_accel( long mx, long my, 
                                double pf[LOCALMEM], double pr[LOCALMEM], 
                                double rdx2, double rdy2, double beta, long niter )
{
#pragma HLS array_partition variable=pf cyclic factor=8
#pragma HLS array_partition variable=pr cyclic factor=8
    DIM2( double, f, my ) = (typeof(f))pf;
    DIM2( double, r, my ) = (typeof(r))pr;
    static double  lower[MAXMY];
#pragma HLS array_partition variable=lower cyclic factor=8
    static double middle[MAXMY];
#pragma HLS array_partition variable=middle cyclic factor=8
    static double  upper[MAXMY];
#pragma HLS array_partition variable=upper cyclic factor=8
    int i, j, k, mx1, my1;
/***/
    mx1 = mx - 1;
    my1 = my - 1;
    for ( k = 0; k < niter; k++ )
     {
      for ( j = 0; j < my; j++ )
       {
#pragma HLS unroll factor=8
        middle[j] = f[0][j];
        upper[j] = f[1][j];
       }
      for ( i = 1; i < mx1; i++ )
       {
        for ( j = 0; j < my; j++ )
         {
#pragma HLS unroll factor=8
          lower[j] = middle[j];
          middle[j] = upper[j];
          upper[j] = f[i+1][j];
         } 
        for ( j = 1; j < my1; j++ )
         {
#pragma HLS unroll factor=8
          f[i][j] = ((lower[j]+upper[j])*rdx2+(middle[j-1]+middle[j+1])*rdy2
                                -r[i][j])*beta;
         }
       }
     }
}

ВНИМАНИЕ!!! Не забываем в псевдокомментарии перед текстом функции itstep_accel() указать для массивов f и r векторность, равную теперь не единице, а восьми.

Анализ текста в vivado_hls.log показывает, что мы своего снова добились:

@I [XFORM-501] Unrolling loop 'Loop-1.1' (itstep_accel.c:53) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-501] Unrolling loop 'Loop-1.2.1' (itstep_accel.c:61) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-501] Unrolling loop 'Loop-1.2.2' (itstep_accel.c:68) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-101] Partitioning array 'pf' (itstep_accel.c:35) in dimension 1 with a cyclic
               factor 8.
@I [XFORM-101] Partitioning array 'lower'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'middle'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'upper'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'pr' (itstep_accel.c:35) in dimension 1 with a cyclic
               factor 8.

К сожалению, запуск и этого сопроцессора показывает, что существенного ускорения нет. Очевидно, нам необходимо задействовать оба приема одновременно: и развертку циклов с расслоением памяти, и конвейеризацию. Казалось бы, нет ничего проще:

#include "dimension.h"
#define MAXMY 5000
#define LOCALMEM 120000
#ifdef FOR_CPU
#include &lt;stdio.h&gt;
#include "comm_cpu.c"
    int itstep_accel_test( int my_number, int n_of_nodes, int mx, int my, 
                                                int stw, int portion )
{
    if ( ((portion+2*stw)*my) &gt; LOCALMEM )
     {
      fprintf( stderr, 
"((portion(%d)+2*stw(%d))*my(%d))&gt;localmem(%d), no memory for arrays in accel\n", 
                            portion, stw, my, LOCALMEM );
      return 1;
     }
    if ( my &gt; MAXMY )
     {
      fprintf( stderr, " my (%d) greater than maxmy (%d)\n", my, MAXMY );
      return 1; 
     }
    else return 0;
}
#endif

/*--vector_proc_64
void itstep_accel(
double pf,      : ram0,inout, wblock=8,size=120000
double pr,      : ram1,in, wblock=8,size=120000
long mx,       : reg0
long my,      : reg1
double rdx2,      : reg2
double rdy2,      : reg3
double beta,      : reg4
long niter      : reg5
)--*/

    void itstep_accel( long mx, long my, 
                                double pf[LOCALMEM], double pr[LOCALMEM], 
                                double rdx2, double rdy2, double beta, long niter )
{
#pragma HLS array_partition variable=pf cyclic factor=8
#pragma HLS array_partition variable=pr cyclic factor=8
    DIM2( double, f, my ) = (typeof(f))pf;
    DIM2( double, r, my ) = (typeof(r))pr;
    static double  lower[MAXMY];
#pragma HLS array_partition variable=lower cyclic factor=8
    static double middle[MAXMY];
#pragma HLS array_partition variable=middle cyclic factor=8
    static double  upper[MAXMY];
#pragma HLS array_partition variable=upper cyclic factor=8
    int i, j, k, mx1, my1;
/***/
    mx1 = mx - 1;
    my1 = my - 1;
    for ( k = 0; k &lt; niter; k++ )
     {
      for ( j = 0; j &lt; my; j++ )
       {
#pragma HLS unroll factor=8
#pragma HLS pipeline
        middle[j] = f[0][j];
        upper[j] = f[1][j];
       }
      for ( i = 1; i &lt; mx1; i++ )
       {
        for ( j = 0; j &lt; my; j++ )
         {
#pragma HLS unroll factor=8
#pragma HLS pipeline
          lower[j] = middle[j];
          middle[j] = upper[j];
          upper[j] = f[i+1][j];
         } 
        for ( j = 1; j &lt; my1; j++ )
         {
#pragma HLS unroll factor=8
#pragma HLS pipeline
          f[i][j] = ((lower[j]+upper[j])*rdx2+(middle[j-1]+middle[j+1])*rdy2
                                -r[i][j])*beta;
         }
       }
     }
}
К сожалению, и этот сопроцессор остается почти таким же медленным, как и все предыдущие. Чтобы понять, в чем дело, посмотрим на текст в файле vivado_hls.log:
@I [XFORM-501] Unrolling loop 'Loop-1.1' (itstep_accel.c:53) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-501] Unrolling loop 'Loop-1.2.1' (itstep_accel.c:62) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-501] Unrolling loop 'Loop-1.2.2' (itstep_accel.c:70) in function 'itstep_accel'
               partially with a factor of 8.
@I [XFORM-101] Partitioning array 'pf' (itstep_accel.c:35) in dimension 1 with a cyclic
               factor 8.
@I [XFORM-101] Partitioning array 'lower'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'middle'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'upper'  in dimension 1 with a cyclic factor 8.
@I [XFORM-101] Partitioning array 'pr' (itstep_accel.c:35) in dimension 1 with a cyclic
               factor 8.

.............................................. (несущественная часть текста пропущена)
@I [HLS-10] ----------------------------------------------------------------
@I [HLS-10] -- Scheduling module 'itstep_accel' 
@I [HLS-10] ----------------------------------------------------------------
@I [SCHED-11] Starting scheduling ...
@I [SCHED-61] Pipelining loop 'Loop 1.1'.
@W [SCHED-69] Unable to schedule 'load' operation ('pf_7_load_2', itstep_accel.c:58)
              on array 'pf_7' due to limited memory ports.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 5, Depth: 6.
@I [SCHED-61] Pipelining loop 'Loop 1.2.1'.
@W [SCHED-69] Unable to schedule 'load' operation ('pf_7_load_11', itstep_accel.c:68)
              on array 'pf_7' due to limited memory ports.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 4, Depth: 6.
@I [SCHED-61] Pipelining loop 'Loop 1.2.2'.
@W [SCHED-68] Unable to enforce a carried dependency constraint (II = 1, distance = 1)
              between 'store' operation (itstep_accel.c:74) of variable 'tmp_23_1'
              on array 'pf_6' and 'store' operation (itstep_accel.c:74)
              of variable 'tmp_24' on array 'pf_6'.
@W [SCHED-68] Unable to enforce a carried dependency constraint (II = 2, distance = 1)
              between 'store' operation (itstep_accel.c:74) of variable 'tmp_23_2'
              on array 'pf_6' and 'store' operation (itstep_accel.c:74) of variable
              'tmp_24' on array 'pf_6'.
@W [SCHED-68] Unable to enforce a carried dependency constraint (II = 3, distance = 1)
              between 'store' operation (itstep_accel.c:74) of variable 'tmp_23_3'
              on array 'pf_6' and 'store' operation (itstep_accel.c:74) of variable
              'tmp_24' on array'pf_6'.
@W [SCHED-68] Unable to enforce a carried dependency constraint (II = 4, distance = 1)
              between 'store' operation (itstep_accel.c:74) of variable 'tmp_23_4'
              on array 'pf_6' and 'store' operation (itstep_accel.c:74) of variable
              'tmp_24' on array 'pf_6'.
@W [SCHED-68] Unable to enforce a carried dependency constraint (II = 7, distance = 1)
              between 'store' operation (itstep_accel.c:74) of variable 'tmp_23_7'
              on array 'pf_6' and 'store' operation (itstep_accel.c:74) of variable
              'tmp_24' on array 'pf_6'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 8, Depth: 41.
@I [SCHED-11] Finished scheduling.
@I [HLS-111] Elapsed time: 6.95 seconds; current memory usage: 79 MB.
@I [HLS-10] ----------------------------------------------------------------

В первой части текста видим, что расслоение массивов и развертывание циклов прошли успешно. Зато во второй части - после строки:

@I [HLS-10] -- Scheduling module 'itstep_accel'
наблюдаем какой-то кошмар. Компилятор категорически отказывается строить конвейер с интервалом подачи данных меньше восьми тактов. При этом он жалуется одновременно на несуществующие, по крайней мере, на первый взгляд, зависимости выполняемых в критическом цикле операций записи элементов массива pf друг от друга, и на нехватку портов памяти для того же массива, хотя, казалось бы, больше одного порта памяти в данном случае категорически не требуется. Никакие усилия по добавлению прагм, принудительно отключающих учет зависимостей, или принудительно заставляющих компилятор использовать для реализации массивов двухпортовую память, или даже по разбиению массива middle на middle1 и middle2, к успеху не приводят. Ссылаясь на мифические зависимости, и столь же мифическую нехватку портов памяти, компилятор категорически отказывается делать то, чего мы от него хотим. Точно так же ведет себя и другая версия компилятора.

Чего мы не заметили? Понять это одновременно и сложно, и очень просто. Строя в своей голове идеальный образ схемы, которую хотелось бы получить, мы, конечно, молчаливо предполагаем, что каждая строка массива f, обработку которой мы хотели бы одновременно развернуть по восьмеркам элементов и конвейеризовать, начинается с нулевого элемента восьмерки. Иными словами, что размер строки массива f делится нацело на 8. Сказать об этом компилятору мы, конечно, забыли. Впрочем, изучение описания компилятора показывает, что именно «сказать» это на языке прагм просто невозможно - подходящих прагм не существует в природе. В итоге компилятор пытается изготовить схему, строго предписанную исходным текстом на С: схему, в которой обработка строки массива f в критическом цикле начинается с произвольного, не известного в момент компиляции, и всякий раз - разного, номера элемента восьмерки, и при этом схема является развернутой в восемь «ниток» и конвейерной! От необходимости построить ТАКУЮ схему у любого схемотехника волосы встанут дыбом. Схема эта - с неизбежностью очень ресурсоемкая и очень медленная, действительно, создающая массу лишних зависимостей на ровном месте, и требующая массы портов памяти. Очевидно, нам теперь придется с совершенно новыми силами искать способ объяснить - таки компилятору, чего мы в действительности от него хотим. Впрочем, главная беда - не в этом. Главная беда - в том, что для понимания, на чем именно «споткнулся» компилятор, разработчику, в данном случае, совершенно необходимо обладать вполне заметным опытом в вычислительной схемотехнике, и весьма желательно - еще и некоторым опытом в разработке компиляторов. То есть в том, что свойство схемы, полученной из текста на аннотированном С, невозможно объяснить в терминах аннотированного С. Но об этом - позже.

Идея текста, из которого может получиться правильная схема, теперь в общих чертах понятна. Пусть мы хотим построить конвейер из N "ниток", и для этого, соответственно, расслоить обрабатываемые массивы на N слоев. Тогда необходимо явно придать обрабатываемым массивам дополнительную младшую размерность, размером N, то есть рассматривать эти массивы не как массивы вещественных чисел, а как массивы N-ок вещественных чисел. Соответственно, во всех циклах обработки строк, перебираться должны теперь не элементы строк, а N-ки элементов строк, и везде добавляется еще по одному, самому внутреннему, циклу из N витков. В этом, самом внутреннем, цикле перебираются элементы N-ки - отдельные вещественные числа. Именно этот цикл мы предложим компилятору развернуть, а цикл, непосредственно объемлющий его - конвейеризовать.

Чтобы не усложнять индексацию массивов, рискуя запутать компилятор, возьмем в качестве исходного материала для приведения в указанный вид ту версию текста itstep_accel() (из Раздела 7 настоящего документа), в которой массивы f и r рассматриваются как одномерные. Нам придется сделать их двумерными, с размером по младшей размерности, равным N, все циклы обхода строк массивов сделать в N раз короче, но зато внутрь этих циклов вставить дополнительные циклы из N витков каждый.

Приведем пример текста, построенного таким образом, из которого получается правильная схема:

#include "dimension.h"
#define MAXMY 5008
#define LOCALMEM 120000
#define FACTOR 16
#ifdef FOR_CPU
#include <stdio.h>
#include "comm_cpu.c"
    int itstep_accel_test( int my_number, int n_of_nodes, int mx, int my, 
                                                int stw, int portion )
{
    if ( ((portion+2*stw)*my) > LOCALMEM )
     {
      fprintf( stderr, 
"((portion(%d)+2*stw(%d))*my(%d))>localmem(%d), no memory for arrays in accel\n", 
                            portion, stw, my, LOCALMEM );
      return 1;
     }
    if ( my > MAXMY )
     {
      fprintf( stderr, " my (%d) greater than maxmy (%d)\n", my, MAXMY );
      return 1; 
     }
    if ( my % FACTOR )
     {
      fprintf( stderr, " my (%d) is not divisible by blocking factor (%d)\n", 
                             my, FACTOR );
      return 1;
     }
    else return 0;
}
#endif

/*--vector_proc_64
void itstep_accel(
double pf,      : ram0,inout, wblock=16,size=120000
double pr,      : ram1,in, wblock=16,size=120000
long mx,       : reg0
long my,      : reg1
double rdx2,      : reg2
double rdy2,      : reg3
double beta,      : reg4
long niter      : reg5
)--*/
#ifdef FOR_CPU
    void itstep_accel( long mx, long my, 
                                           double f[LOCALMEM], double r[LOCALMEM], 
                                 double rdx2, double rdy2, double beta, long niter )
{
    DIM2( double, pf, FACTOR ) = (typeof(pf))f;
    DIM2( double, pr, FACTOR ) = (typeof(pr))r;
#else
    void itstep_accel( long mx, long my, 
                                           double pf[LOCALMEM/FACTOR][FACTOR], 
                                           double pr[LOCALMEM/FACTOR][FACTOR], 
                                 double rdx2, double rdy2, double beta, long niter )
{
#endif
#pragma HLS array_partition variable=pf cyclic factor=16 dim=2
#pragma HLS resource variable=pf core=RAM_2P
#pragma HLS array_partition variable=pr cyclic factor=16 dim=2
#pragma HLS resource variable=pr core=RAM_2P
//     DIM2( double, f, my ) = (typeof(f))pf;
//     DIM2( double, r, my ) = (typeof(r))pr;
    static double  lower[MAXMY/FACTOR][FACTOR];
#pragma HLS array_partition variable=lower cyclic factor=16 dim=2
#pragma HLS resource variable=lower core=RAM_2P
    static double middle1[MAXMY/FACTOR][FACTOR];
#pragma HLS array_partition variable=middle1 cyclic factor=16 dim=2
#pragma HLS resource variable=middle1 core=RAM_2P
    static double middle2[MAXMY/FACTOR][FACTOR];
#pragma HLS array_partition variable=middle2 cyclic factor=16 dim=2
#pragma HLS resource variable=middle2 core=RAM_2P
    static double  upper[MAXMY/FACTOR][FACTOR];
#pragma HLS array_partition variable=upper cyclic factor=16 dim=2
#pragma HLS resource variable=upper core=RAM_2P
    int i, j, k, m, k0, k1, mx1, my1;
/***/
    mx1 = mx - 1;
    my1 = my - 1;
    for ( k = 0; k < niter; k++ )
     {
      for ( j = 0; j < my/FACTOR; j++ )
       {
#pragma HLS pipeline
        for ( m = 0; m < FACTOR; m++ )
         {
#pragma HLS unroll factor=16
          middle1[j][m] = pf[j][m];
          middle2[j][m] = pf[j][m];
          upper[j][m] = pf[my/FACTOR+j][m];
         }    
       }
      k0 = 0; 
      k1 = my/FACTOR; 
      for ( i = 1; i < mx1; i++ )
       {
        k0 += my/FACTOR;
        k1 += my/FACTOR;
        for ( j = 0; j < my/FACTOR; j++ )
         {
#pragma HLS pipeline
          for ( m = 0; m < FACTOR; m++ )
           {
#pragma HLS unroll factor=16
            lower[j][m] = middle1[j][m];
            middle1[j][m] = upper[j][m];
            middle2[j][m] = upper[j][m];
            upper[j][m] = pf[k1+j][m];
           } 
         } 
        for ( j = 0; j < my/FACTOR; j++ )
         {
#pragma HLS pipeline
          if ( j > 0 )
           pf[k0+j][0] = ((lower[j][0]+upper[j][0])*rdx2
                  +(middle1[j-1][FACTOR-1]+middle2[j][1])*rdy2-pr[k0+j][0])*beta;
          for ( m = 1; m < FACTOR-1; m++ )
           {
#pragma HLS unroll factor=16-2
             pf[k0+j][m] = ((lower[j][m]+upper[j][m])*rdx2
                    +(middle1[j][m-1]+middle2[j][m+1])*rdy2-pr[k0+j][m])*beta;
           }
          if ( j < my/FACTOR-1 )  
           pf[k0+j][FACTOR-1] = ((lower[j][FACTOR-1]+upper[j][FACTOR-1])*rdx2
                  +(middle1[j][FACTOR-2]+middle2[j+1][0])*rdy2-pr[k0+j][FACTOR-1])*beta;
         }
       }
     }
}

Константа, выполняющая роль N в нашем рассуждении, предшествующем этому тексту, в этом тексте называется FACTOR. Константа FACTOR, заданная равной 16, равна коэффициенту развертывания циклов, он же - число слоев при расслоении памяти, он же - число "ниток" конвейера. В itstep_accel_test() мы теперь проверяем, делится ли my (длина строки массива f) на это значение нацело, и, если нет, отказываемся работать.

Любопытный "подарок" от компилятора вынудил нас несколько усложнить запись заголовка функции itstep_accel(), сделав ее, вопреки провозглашенным ранее принципам, разной для программной модели (#ifdef FOR_CPU) и для "настоящего" гибридно - параллельного приложения (#else):

#ifdef FOR_CPU
    void itstep_accel( long mx, long my, 
                                double f[LOCALMEM], double r[LOCALMEM], 
                                double rdx2, double rdy2, double beta, long niter )
{
    DIM2( double, pf, FACTOR ) = (typeof(pf))f;
    DIM2( double, pr, FACTOR ) = (typeof(pr))r;
#else
    void itstep_accel( long mx, long my, 
                                double pf[LOCALMEM/FACTOR][FACTOR], 
                                double pr[LOCALMEM/FACTOR][FACTOR], 
                                double rdx2, double rdy2, double beta, long niter )
{
#endif

Вариант записи для программной модели объявляет массивы - параметры f и r одномерными, чтобы не делать описание прототипа функции itstep_accel(), присутствующее, например, в файле с исходным текстом функции itstep(), зависимым от значения FACTOR, которое является "внутренним делом" функции itstep_accel(). Но внутри функции эти массивы нужны нам как двумерные, с размером по второму измерению, равному FACTOR, и мы определяем именно такие массивы pf и pr, которые и будут далее фигурировать в нашем тексте.

В варианте записи для "настоящего" гибридно - параллельного приложения мы вынуждены объявить эти массивы - параметры как двумерные непосредственно в заголовке функции. Дело в том, что такую запись, как мы использовали для программной модели, компилятор воспринимать отказывается, "говоря" в vivado_hls.log следующее:

@I [HLS-10] Checking synthesizability ...
@E [SYNCHK-41] itstep_accel.c:44: unsupported pointer reinterpretation from type
               '[120000 x double]*' to type '[16 x double]*' on variable 'f'.
@I [SYNCHK-10] 1 error(s), 0 warning(s).
@E [HLS-70] Synthesizability check failed.

Любопытно, что ранее, изготавливая наши самые первые, крайне медленные, но формально работоспособные, варианты сопроцессора, мы вполне успешно пользовались этим приемом, и компилятор все устраивало. Правда, тогда в качестве длины строки мы использовали не константу, а переменную my. В том ли здесь дело, что компилятор перестал понимать "переопределение" массивов - параметров, когда всерьез занялся оптимизацией, или в том, что, обнаружив в качестве размера по второму измерению константу, компилятор как-то особенно глубоко "осознал" всю серьезность наших намерений, для нас сейчас не очень важно. Действительно важно то, что компилятор нас, похоже, все-таки понял, приняв-таки записанную нами двумерность массива с размером по второму измерению, равному FACTOR, "всерьез".

Обратим внимание на то, что, помимо единожды заданного в начале текста значения константы FACTOR, число "16" многократно присутствует в нашем исходном тексте в явном виде. Это само по себе очень плохо из технологических соображений, но, в данном случае, неизбежно: константу FACTOR вместо явно записанного значения "16" нельзя использовать в прагмах и псевдокомментариях. При необходимости изменить значение этой константы, впредь придется прибегать к контекстному поиску строки "16", чтобы убедиться, что заменены все вхождения. Чтобы контекстный поиск показывал все необходимые строки, в самой последней прагме, в конце текста, значение "14" указано не как "14", а как "16-2".

Перейдем к прагмам, сопровождающим объявления массивов.

Все наши массивы стали теперь двумерными, с длиной строки, равной FACTOR. Объявление каждого массива сопровождается такой же парой прагм, как, например, для массива pf:

#pragma HLS array_partition variable=pf cyclic factor=16 dim=2
#pragma HLS resource variable=pf core=RAM_2P

Первая из прагм предписывает расслоить память массива по второму измерению на 16 слоев, вторая - использовать для реализации массива двухпортовую память. Не исключено, что вторая прагма - лишняя, но вреда от нее нет, поскольку внутренняя блочная память нашей ПЛИС все равно вся двухпортовая.

Дальнейший текст вполне очевиден, и не нуждается в особых комментариях, кроме текста завершающего цикла:

for ( j = 0; j < my/FACTOR; j++ )
         {
#pragma HLS pipeline
          if ( j > 0 )
           pf[k0+j][0] = ((lower[j][0]+upper[j][0])*rdx2
                               +(middle1[j-1][FACTOR-1]+middle2[j][1])*rdy2-pr[k0+j][0])*beta;
          for ( m = 1; m < FACTOR-1; m++ )
           {
#pragma HLS unroll factor=16-2
             pf[k0+j][m] = ((lower[j][m]+upper[j][m])*rdx2
                                +(middle1[j][m-1]+middle2[j][m+1])*rdy2-pr[k0+j][m])*beta;
           }
          if ( j < my/FACTOR-1 )  
           pf[k0+j][FACTOR-1] = ((lower[j][FACTOR-1]
                             +upper[j][FACTOR-1])*rdx2
                             +(middle1[j][FACTOR-2]+middle2[j+1][0])*rdy2
                             -pr[k0+j][FACTOR-1])*beta;
         }

Выписывая этот цикл, мы фактически "делаем за компилятор его работу". Нам хотелось бы в теле этого цикла сформировать m-е новое значение для j-й "шестнадцатки" значений массива pf. Однако, формирование первого и последнего по счету (точнее, нулевого и 15-го) значения "шестнадцатки" происходит не так, как формирование всех остальных. Отличие в формировании этих крайних значений в том, что элемент одного из массивов middle приходится брать не из текущей, а из соседней "шестнадцатки". При этом приходится учитывать, что у нулевой "шестнадцатки" каждой строки массива нет предыдущей, а у последней "шестнадцатки" нет следующей, и соответствующие крайние элементы вычислять не надо, поскольку это - остающиеся неизменными по ходу итераций края сетки. Поэтому вычисления крайних элементов "шестнадцатки" приходится записывать отдельно, а цикл обработки внутренних элементов "шестнадцатки" разворачивать не на 16, а на 16-2=14 витков. При этом мы надеемся, что компилятор "догадается" поместить вычисления крайних элементов шестнадцатки на две отдельных "нитки" конвейера, и он это, похоже, действительно делает.

Интересная для нас часть текста vivado_hls.log выглядит теперь так:

@I [XFORM-501] Unrolling loop 'Loop-1.1.1' (itstep_accel.c:76)
               in function 'itstep_accel': changing partial unrolling into complete unrolling
               since the unrolling factor (=16) is no less than the loop trip count (=16).
@I [XFORM-501] Unrolling loop 'Loop-1.2.1.1' (itstep_accel.c:93) in function 'itstep_accel':
               changing partial unrolling into complete unrolling since the unrolling factor
               (=16) is no less than the loop trip count (=16).
@I [XFORM-501] Unrolling loop 'Loop-1.2.2.1' (itstep_accel.c:107) in function 'itstep_accel':
               changing partial unrolling into complete unrolling since the unrolling factor
               (=14) is no less than the loop trip count (=14).
@I [HLS-10]    Checking synthesizability ...
.............................................. (несущественная часть текста пропущена)
@I [XFORM-501] Unrolling loop 'Loop-1.1.1' (itstep_accel.c:76) in function 'itstep_accel'
               completely.
@I [XFORM-501] Unrolling loop 'Loop-1.2.1.1' (itstep_accel.c:93) in function 'itstep_accel'
               completely.
@I [XFORM-501] Unrolling loop 'Loop-1.2.2.1' (itstep_accel.c:107) in function 'itstep_accel'
               completely.
@I [XFORM-101] Partitioning array 'pf' (itstep_accel.c:47) in dimension 2 completely.
@I [XFORM-101] Partitioning array 'lower'  in dimension 2 completely.
@I [XFORM-101] Partitioning array 'middle1'  in dimension 2 completely.
@I [XFORM-101] Partitioning array 'middle2'  in dimension 2 completely.
@I [XFORM-101] Partitioning array 'upper'  in dimension 2 completely.
@I [XFORM-101] Partitioning array 'pr' (itstep_accel.c:47) in dimension 2 completely.
.............................................. (несущественная часть текста пропущена)
@I [HLS-10] ----------------------------------------------------------------
@I [HLS-10] -- Scheduling module 'itstep_accel' 
@I [HLS-10] ----------------------------------------------------------------
@I [SCHED-11] Starting scheduling ...
@I [SCHED-61] Pipelining loop 'Loop 1.1'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 2.
@I [SCHED-61] Pipelining loop 'Loop 1.2.1'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 2.
@I [SCHED-61] Pipelining loop 'Loop 1.2.2'.
@I [SCHED-61] Pipelining result: Target II: 1, Final II: 1, Depth: 28.
@I [SCHED-11] Finished scheduling.
.............................................. (несущественная часть текста пропущена)

Все циклы развернуты, все конвейеры построены, значения II (темпа подачи данных) для всех конвейеров равны единице (конвейеры работают с максимально возможной частотой).

Исходные данные для тестового прогона теперь придется немного изменить. Ширину сетки теперь надо задавать равной не 4000, а 3998, чтобы после прибавления к ней двойки (суммарная ширина краев) итоговая ширина сетки делилась нацело на 16.

Измерения быстродействия такого сопроцессора сразу же показывают, что мы на верном пути. Так, вариант с не расширенной теневой гранью, ранее работавший на одном сопроцессоре 533.1с, теперь выполняется за 23.66с, причем время чистой обработки, без учета пересылки массивов между процессором и сопроцессором, составляет 2.75с. Мы пришли к тому, к чему, собственно, и стремились с самого начала: получили сопроцессор, уверенно обгоняющий процессор общего назначения по скорости обработки данных, и сделали "узким местом" пересылку этих данных между процессором и сопроцессором. Теперь мы ясно видим, что наши усилия по структурной перестройке программы с целью локализации обработки в мелких блоках, которым мы посвятили разделы с 4-го по 7-й настоящего документа, действительно, не пропали даром. Благодаря этим усилиям, у нас есть теперь возможность кратно ускорить пересылку данных. Процессор общего назначения мы сильно не обгоним, но, как отмечалось выше, в обсуждении в конце Раздела 3, мы на это особенно и не надеялись, поскольку задача у нас - модельная, и насыщенность обращений к памяти арифметическими операциями в ее критическом цикле довольно мала.

Наша схема теперь использует около 26% универсальной логики, и почти половину блоков DSP. Попытка собрать схему с конвейером из 32 ниток приводит к исчерпанию памяти инструментальной машины в процессе синтеза схемы. Впрочем, из приведенных только что результатов измерений видно, что большого смысла в дальнейшем ускорении именно обработки данных внутри сопроцессора нет. Эксперименты по построению конвейеров с числом ниток, не являющемся степенью двойки, не проводились.

Многочисленные эксперименты по многократному расширению теневой грани дали, в целом, ожидаемые результаты, но выявили одну неприятную особенность применения этого приема ускорения расчета.

Ширина расширенной теневой грани, как мы установили в Разделе 6, даже формально не может превышать трети "полезного" числа обрабатываемых строк мелкого блока (константа PORTION). В случае, если она равна трети "полезного" числа строк, ко времени полезной обработки блока добавляется еще две трети времени обработки "мусора", что значительно снижает итоговое быстродействие. А ведь "мусор" приходится еще и копировать в сопроцессор, что совсем уже плохо (копирование у нас - узкое место). В общем, чем меньше доля "мусорных"строк в общем объеме мелкого блока, тем лучше. Но объем внутренней памяти сопроцессора ограничен. Значит, при данном объеме внутренней памяти, обработка большого числа коротких строк гораздо выгоднее, чем обработка небольшого числа длинных строк. Но строка сетки имеет совершенно конкретную ширину, а приведенный в Разделах 6 и 7 текст функции itstep_parts() формирует мелкие блоки всегда из целых строк, даже если строки - длинные. Очевидно, более эффективная и универсальная версия itstep_parts() должна нарезать сетку на мелкие блоки по обоим измерениям, на манер "шахматной доски", организуя перекрытие столбцов по тем же правилам, по которым она сейчас организует перекрытие строк. Это позволит обрабатывать сетки произвольной ширины, обеспечивая при этом внутри мелких блоков более выгодное, чем сейчас, соотношение между размером блока и длиной его строки.

К сожалению, такое усовершенствование сделает текст функции itstep_parts(), и так не блещущий краткостью и понятностью, еще более сложным. Тогда придется подумать о том, чтобы, написав эту сложную логику один раз, сделать ее не примером для подражания и предметом многословных пояснений, а частью какой-то библиотеки, "внутренняя кухня" которой скрыта от прикладного программиста, а внешняя функциональность применима для достаточно широкого класса задач.

 
 
 
 
 
 
 
  Тел. +7(499)220-79-72; E-mail: inform@kiam.ru