Do not speak Portuguese? Translate this site with Google or Bing Translator
Como buscar dados do AWS Athena usando PHP

Posted on: March 10, 2021 07:13 PM

Posted by: Renato

Categories: Laravel PHP aws athena

Views: 2549

Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]

Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]
`How to fetch data from AWS Athena using PHP`

# O que é AWS Athena? [What is AWS Athena?]

R: O Amazon Athena é um serviço de consultas interativas que facilita a análise de dados no Amazon S3 usando SQL padrão. O Athena não precisa de servidor. Portanto, não há infraestrutura para gerenciar e você paga apenas pelas consultas executadas. O Athena é fácil de usar.
Video: https://www.youtube.com/watch?v=X9-ahfoLEaM

COMO FUNCIONA E O QUE É O AMAZON ATHENA


O Amazon Athena é um serviço que permite que um analista de dados realize consultas interativas na nuvem pública da AWS em dados armazenados no S3. Como o Athena é um serviço de consulta sem servidor, um analista não precisa gerenciar nenhuma infraestrutura de computação para usá-lo.

Também não há necessidade de carregar dados S3 no Amazon Athena ou transformá-los para análise, o que torna mais fácil e rápido para um analista obter insight. Um analista de dados acessa a Athena por meio do Console de gerenciamento da AWS, de uma interface de programação de aplicativos (API) ou de um driver de conexão JAVA.  Apenas defina o esquema e comece a executar consultas SQL nos dados do S3.

Um administrador pode gerenciar o acesso à Athena por meio das políticas de gerenciamento de identidades e acesso da AWS, listas de controle de acesso e políticas de bucket do Amazon S3. Um usuário do Athena pode consultar dados criptografados com chaves gerenciadas pelo AWS Key Management Service e também pode criptografar os resultados da consulta. O Athena também permite o acesso entre contas aos buckets do S3 de outro usuário.

Além disso, a Athena usa catálogos de dados gerenciados para armazenar informações e esquemas relacionados às suas pesquisas nos dados do Amazon S3.

Tipos de dados suportados e integração
A Amazon Athena conta com o mecanismo de consulta SQL de código aberto distribuído pela Presto para permitir análises rápidas ad-hoc e solicitações mais complexas, incluindo funções de janela, grandes junções e agregações . A Athena pode processar tipos de dados estruturados e não estruturados, incluindo formatos como CSV, JSON, ORC, Parquet e Avro. O Athena também suporta dados compactados nos formatos Snappy, Zlib, LZO e GZIP.

O Athena integra-se a outros serviços no portfólio da AWS. Por exemplo, você pode usá-lo com o Amazon QuickSight para visualizar dados ou com o AWS Glue para ativar recursos de catálogo de dados mais sofisticados, como repositório de metadados, esquema automatizado e reconhecimento de partições e pipelines de dados baseados em Python . A própria Athena usa o Amazon S3 como um armazenamento de dados subjacente, que fornece redundância de dados.


Aqui abaixo vou apresentar como fiz para poder  usar com PHP o Athenas da AWS

```sql
SELECT * FROM "sampledb"."elb_logs" limit 10;
```

Rotas:
- http://localhost:8000/api/athena/get    

```php
Route::prefix('athena')->group(function () {
        Route::post('get', [AthenaController::class, 'getData']);
});
```    
```
Body:
```json
{
    "machine": "Case_Erector_Machine"

```
Aqui um modelo de exemplo que funcionou perfeito. Usar com Laravel(PHP) o controller

```php
use Illuminate\Http\Request;
use AWS;

class AthenaController extends Controller
{    
    public function getData(Request $request)
    {        
        $params = $request->all();
        
        try {
            $athena = AWS::createClient('athena');            
            $result1 = $athena->StartQueryExecution(
                array(
                    "QueryExecutionContext" => array("Database" => 'demo_db'),
                    //"QueryString" => 'SELECT * FROM flow_pkg_machine ORDER BY timestamp DESC limit 50',
                    "QueryString" => "SELECT * FROM flow_pkg_machine WHERE machine = '" . $params['machine'] . "' ORDER BY timestamp DESC LIMIT 50",
                    "ResultConfiguration" => array(
                        "EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"),
                        "OutputLocation" => 's3://..........',
                    ),
                )
            );

            $QueryExecutionId = $result1->get('QueryExecutionId');
            
            $this->waitForQueryToComplete($QueryExecutionId);

            $result1 = $athena->GetQueryResults(array(
                'QueryExecutionId' => $QueryExecutionId,
                'MaxResults' => 500,
            ));

            $data = $result1->get('ResultSet');
            $res = $data['Rows'];            
            $records = [];

            for ($i = 0; $i < count($res); $i++) {
                    $record = [
                        "timestamp" => intval($res[$i]['Data'][0]['VarCharValue']),
                        "machine" => $res[$i]['Data'][1]['VarCharValue'],
                        "variable" => $res[$i]['Data'][2]['VarCharValue'],
                        "value" => (double)$res[$i]['Data'][3]['VarCharValue'],
                    ];
                
                array_push($records, $record);
            }
            array_shift($records);

            return response()->json(['data' => $records]);

        } catch (AwsException $e) {
            error_log($e->getMessage());
        }

    }
    public function waitForQueryToComplete($QueryExecutionId)
    {
        $athena = AWS::createClient('athena');
        
        while (1) {
            $result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId));
            $res = $result->toArray();

            //echo $res['QueryExecution']['Status']['State'].'
';
            if ($res['QueryExecution']['Status']['State'] == 'FAILED') {
                echo 'Query Failed';
                die;
            } else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') {
                echo 'Query was cancelled';
                die;
            } else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') {
                break; // break while loop
            }

        }
    }

}

```

Metodo que quais funcionou aqui abaixo:

```php
public function index()
    {
        $id = 0;

        $athena = AWS::createClient('athena');
        //\Log::info(json_encode($athena));

        $query = 'SELECT * FROM "flow_pkg" limit 10';
        //\Log::info(json_encode($query));

        $result = $athena->startQueryExecution([
            'QueryExecutionContext' => [
                'Database' => 'demo_db',
            ],
            'QueryString' => 'SELECT * FROM flow_pkg_machine LIMIT 10', // REQUIRED
            'ResultConfiguration' => [ // REQUIRED
                'EncryptionConfiguration' => [
                    'EncryptionOption' => 'SSE_S3', // REQUIRED
                ],
                'OutputLocation' => 's3://teste-demo/demo/tables/', // REQUIRED
            ],
        ]);

        // check completion : getQueryExecution()
        $exId = $result['QueryExecutionId'];
        //$id = $result['QueryExecutionId'];

        $checkExecution = $athena->getQueryExecution([
            'QueryExecutionId' => $exId, // REQUIRED
        ]);
        //dd($checkExecution["QueryExecution"]["ResultConfiguration"]["OutputLocation"]);
        
        //sleep(10);

        /*switch ($checkExecution["QueryExecution"]["Status"]["State"]) {
        case 'SUCCEEDED':
        dump('ok!');
        break;
        case 'RUNNING':
        dump('sleep(30)');
        sleep(30);
        if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') {
        sleep(20);
        } elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
        sleep(30);
        }
        break;
        case 'QUEUED':
        sleep(20);
        break;
        case 'FAILED':
        dump('FAILED');
        break;
        default:
        # code...
        sleep(20);
        break;
        }*/

        do {
            if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') {
                dump('sleep(30)');
                sleep(30);
                break;
            } elseif ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
                sleep(30);
                if ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
                    sleep(20);
                }

            }

        } while (0);

        if ($checkExecution["QueryExecution"]["Status"]["State"] == 'SUCCEEDED') // 'QUEUED|RUNNING|SUCCEEDED|FAILED|CANCELLED',
        {
            //dd($result['QueryExecutionId']);
            $dataOutput = $athena->getQueryResults([
                'QueryExecutionId' => $result['QueryExecutionId'], // REQUIRED
            ]);
            dd($dataOutput);
            while (($data = fgetcsv($dataOutput, 1000, ",")) !== false) {
                $num = count($data);
                //dd($data);
                echo "

$num fields in line $row:

\n";
                $row++;
                for ($c = 0; $c < $num; $c++) {
                    echo $data[$c] . "
\n";
                }
            }

        }
        // elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
        //     sleep(40);
        // } else {
        //     sleep(10);
        // }

        return response()->json(['status' => $checkExecution["QueryExecution"]["Status"]["State"]]);

    }
```

Outro metodo que tambem utilizei, mas não retornou valores que precisava.
```php
use AWS;

class athenaController extends Controller
{
    public function getData()
    {
        try {
            $athena = AWS::createClient('athena');
            
            $result1 = $athena->StartQueryExecution(
                array(
                    "QueryExecutionContext" => array("Database" => 'demo_db'),
                    "QueryString" => 'SELECT * FROM flow_pkg LIMIT 10',
                    "ResultConfiguration" => array(
                        "EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"),
                        "OutputLocation" => 's3://........, // '. $result_logs,
                    ),
                )
            );

            $QueryExecutionId = $result1->get('QueryExecutionId');
            \Log::info($QueryExecutionId);

            $this->waitForQueryToComplete($QueryExecutionId);
            //dd($QueryExecutionId);

            $result1 = $athena->GetQueryResults(array(
                'QueryExecutionId' => $QueryExecutionId, // REQUIRED
                'MaxResults' => 500,
            ));
            \Log::info(json_encode($result1));

            $data = $result1->get('ResultSet');
            $res = $data['Rows'];
            json_encode($data['Rows']);
            //dd($res);

            return response()->json($data['Rows']);

            while (true) {

                if ($result1->get('NextToken') == null) {
                    break;
                }

                $result1 = $athena->GetQueryResults(array(
                    'QueryExecutionId' => $QueryExecutionId, // REQUIRED
                    'NextToken' => $result1->get('NextToken'),
                    'MaxResults' => 500,
                ));

                $data = $result1->get('ResultSet');
                $res = array_merge($res, $data['Rows']);
            }

            $resData = $this->processResultRows($res);

            return $resData;

        } catch (\Exception $e) {
            // Catch an S3 specific exception.
            echo $e->getMessage();
        } catch (AwsException $e) {
            // This catches the more generic AwsException. You can grab information
            // from the exception using methods of the exception object.

            echo $e->getAwsRequestId() . "\n";
            echo $e->getAwsErrorType() . "\n";
            echo $e->getAwsErrorCode() . "\n";
            error_log($e->getMessage());
        }

    }
    public function waitForQueryToComplete($QueryExecutionId)
    {
        $athena = AWS::createClient('athena');
        
        while (1) {
            $result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId));
            $res = $result->toArray();

            //echo $res['QueryExecution']['Status']['State'].'
';
            if ($res['QueryExecution']['Status']['State'] == 'FAILED') {
                echo 'Query Failed';
                die;
            } else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') {
                echo 'Query was cancelled';
                die;
            } else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') {
                break; // break while loop
            }

        }
    }

    /*
     * function to process data
     */
    public function processResultRows($res)
    {
        $result = array();
        $resul_array = array();

        for ($i = 0; $i <= count($res); $i++) {
            for ($n = 0; $n < count($res[$i]['Data']); $n++) {
                if ($i == 0) {
                    $result[] = $res[$i]['Data'][$n]['VarCharValue'];
                } else {
                    $resul_array[$i][$result[$n]] = $res[$i]['Data'][$n]['VarCharValue'];
                }
            }
        }

        echo 'resul_array_cnt: '.count($resul_array).'
';
        return $resul_array;
    }

}

```

Fontes onde pesquisei:

- https://amolmatkars.wordpress.com/2017/07/20/how-to-fetch-data-from-aws-athena-using-php/comment-page-1/?unapproved=169&moderation-hash=486731d61ff8ca48b1414df250865850#comment-169
- https://aws.amazon.com/documentation/sdk-for-php/
- https://docs.aws.amazon.com/athena/latest/ug/select.html
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#getqueryresults
- https://www.php.net/manual/pt_BR/function.array-shift.php
- https://titanwolf.org/Network/Articles/Article?AID=d9e0d23a-28a8-4390-8a33-7513fcc45f64#gsc.tab=0
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#startqueryexecution
- https://stackoverflow.com/questions/47568281/return-json-from-athena-query-via-api
- https://stackoverflow.com/questions/55345016/how-to-use-athena-with-laravel/66548012#66548012
- https://packagist.org/packages/alipeng/laravel-athena
- https://github.com/alipeng/laravel-athena
- https://github.com/flemzord/laravel-athena
- https://github.com/Influo/laravel-athena
- https://mugnos-it.com/


# Exemplo que não deu certo

```php
Our database config:

        'test_athena' => [
            'driver' => 'odbc',
            'dsn' => 'odbc:Driver=/opt/simba/athenaodbc/lib/64/libathenaodbc_sb64.so;'
                    .'AwsRegion=us-east-1;'
                    .'AuthenticationType=IAM Credentials;'
                    .'UID=;'
                    .'PWD=;'
                    .'S3OutputLocation=s3:///;',
            'host' => env('ATHENA_HOST', 'localhost'),
            'port' => env('ATHENA_PORT', '5432'),
            'database' => env('ATHENA_DATABASE', 'forge'),
            'username' => env('ATHENA_USERNAME', 'forge'),
            'password' => env('ATHENA_PASSWORD', ''),
            'charset' => 'utf8',
            'prefix' => '',
            'schema' => 'public',
            'options' => [
                \PDO::ATTR_EMULATE_PREPARES => true,
            ],
```

```php            
$pdo = DB::connection('test_athena')->getPdo();

// this crashes
$pdo->prepare('select * from "qa_lines_csv" where "id" > ? limit 1')->execute([100]);

// this does not crash
$pdo->prepare('select * from "qa_lines_csv" where "id" > 100 limit 1')->execute([]);

// this crashes
DB::connection('test_athena')->table('qa_lines_csv')->where('id', '>', 100)->first()
```


1

Share

Donate to Site


About Author

Renato

Developer

Add a Comment

Blog Search


Categories

OUTROS (16) Variados (109) PHP (133) Laravel (171) Black Hat (3) front-end (29) linux (114) postgresql (39) Docker (28) rest (5) soap (1) webservice (6) October (1) CMS (2) node (7) backend (13) ubuntu (56) devops (25) nodejs (5) npm (3) nvm (1) git (8) firefox (1) react (7) reactnative (5) collections (1) javascript (7) reactjs (8) yarn (0) adb (1) Solid (2) blade (3) models (1) controllers (0) log (1) html (2) hardware (3) aws (14) Transcribe (2) transcription (1) google (4) ibm (1) nuance (1) PHP Swoole (5) mysql (31) macox (4) flutter (1) symfony (1) cor (1) colors (2) homeOffice (2) jobs (3) imagick (2) ec2 (1) sw (1) websocket (2) markdown (1) ckeditor (1) tecnologia (14) faceapp (1) eloquent (14) query (4) sql (40) ddd (3) nginx (9) apache (4) certbot (1) lets-encrypt (3) debian (12) liquid (1) magento (2) ruby (1) LETSENCRYPT (1) Fibonacci (1) wine (1) transaction (1) pendrive (1) boot (1) usb (1) prf (1) policia (2) federal (1) lucena (1) mongodb (4) paypal (1) payment (1) zend (1) vim (4) ciencia (6) js (1) nosql (1) java (1) JasperReports (1) phpjasper (1) covid19 (1) saude (1) athena (1) cinnamon (1) phpunit (2) binaural (1) mysqli (3) database (42) windows (6) vala (1) json (2) oracle (1) mariadb (4) dev (12) webdev (24) s3 (4) storage (1) kitematic (1) gnome (2) web (2) intel (3) piada (1) cron (2) dba (18) lumen (1) ffmpeg (2) android (2) aplicativo (1) fedora (2) shell (4) bash (3) script (3) lider (1) htm (1) csv (1) dropbox (1) db (3) combustivel (2) haru (1) presenter (1) gasolina (1) MeioAmbiente (1) Grunt (1) biologia (1) programming (22) performance (3) brain (1) smartphones (1) telefonia (1) privacidade (1) opensource (3) microg (1) iode (1) ssh (3) zsh (2) terminal (3) dracula (1) spaceship (1) mac (2) idiomas (1) laptop (2) developer (37) api (5) data (1) matematica (1) seguranca (2) 100DaysOfCode (9) hotfix (1) documentation (1) laravelphp (10) RabbitMQ (3) Elasticsearch (1) redis (2) Raspberry (4) Padrao de design (4) JQuery (1) angularjs (4) Dicas (43) Kubernetes (3) vscode (2) backup (1) angular (3) servers (2) pipelines (1) AppSec (1) DevSecOps (4) rust (1) RustLang (1) Mozilla (1) algoritimo (1) sqlite (1) Passport (2) jwt (5) security (2) translate (1) kube (2) iot (1) politica (2) bolsonaro (1) flow (1) podcast (1) Brasil (1) containers (3) traefik (1) networking (1) host (1) POO (2) microservices (2) bug (1) cqrs (1) arquitetura (3) Architecture (4) sail (3) militar (1) artigo (1) economia (1) forcas armadas (1) ffaa (1) autenticacao (2) autorizacao (2) authentication (4) authorization (3) NoCookies (1) wsl (4) memcached (1) macos (2) unix (2) kali-linux (1) linux-tools (5) apple (1) noticias (2) composer (1) rancher (1) k8s (1) escopos (1) orm (1) jenkins (4) github (5) gitlab (3) queue (1) Passwordless (1) sonarqube (1) phpswoole (1) laraveloctane (1) Swoole (1) Swoole (1) octane (1) Structurizr (1) Diagramas (1) c4 (1) c4-models (1) compactar (1) compression (1) messaging (1) restfull (1) eventdrive (1) services (1) http (1) Monolith (1) microservice (1) historia (1) educacao (1) cavalotroia (1) OOD (0) odd (1) chatgpt (1) openai (3) vicuna (1) llama (1) gpt (1) transformers (1) pytorch (1) tensorflow (1) akitando (1) ia (1) nvidia (1) agi (1) guard (1) multiple_authen (2) rpi (1) auth (1) auth (1) livros (2) ElonMusk (2) Oh My Zsh (1) Manjaro (1) BigLinux (2) ArchLinux (1) Migration (1) Error (1) Monitor (1) Filament (1) LaravelFilament (1) replication (1) phpfpm (1) cache (1) vpn (1) l2tp (1) zorin-os (1) optimization (1) scheduling (1) monitoring (2) linkedin (1) community (1) inteligencia-artificial (2) wsl2 (1) maps (1) API_KEY_GOOGLE_MAPS (1) repmgr (1) altadisponibilidade (1) banco (1) modelagemdedados (1) inteligenciadedados (4) governancadedados (1) bancodedados (2) Observability (1) picpay (1) ecommerce (1) Curisidades (1) Samurai (1) KubeCon (1) GitOps (1) Axios (1) Fetch (1) Deepin (1) vue (4) nuxt (1) PKCE (1) Oauth2 (2) webhook (1) TypeScript (1) tailwind (1) gource (2)

New Articles



Get Latest Updates by Email