Posted on: March 10, 2021 07:13 PM
Posted by: Renato
Views: 2549
Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]
Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]
`How to fetch data from AWS Athena using PHP`
# O que é AWS Athena? [What is AWS Athena?]
R: O Amazon Athena é um serviço de consultas interativas que facilita a análise de dados no Amazon S3 usando SQL padrão. O Athena não precisa de servidor. Portanto, não há infraestrutura para gerenciar e você paga apenas pelas consultas executadas. O Athena é fácil de usar.
Video: https://www.youtube.com/watch?v=X9-ahfoLEaM
COMO FUNCIONA E O QUE É O AMAZON ATHENA
O Amazon Athena é um serviço que permite que um analista de dados realize consultas interativas na nuvem pública da AWS em dados armazenados no S3. Como o Athena é um serviço de consulta sem servidor, um analista não precisa gerenciar nenhuma infraestrutura de computação para usá-lo.
Também não há necessidade de carregar dados S3 no Amazon Athena ou transformá-los para análise, o que torna mais fácil e rápido para um analista obter insight. Um analista de dados acessa a Athena por meio do Console de gerenciamento da AWS, de uma interface de programação de aplicativos (API) ou de um driver de conexão JAVA. Apenas defina o esquema e comece a executar consultas SQL nos dados do S3.
Um administrador pode gerenciar o acesso à Athena por meio das políticas de gerenciamento de identidades e acesso da AWS, listas de controle de acesso e políticas de bucket do Amazon S3. Um usuário do Athena pode consultar dados criptografados com chaves gerenciadas pelo AWS Key Management Service e também pode criptografar os resultados da consulta. O Athena também permite o acesso entre contas aos buckets do S3 de outro usuário.
Além disso, a Athena usa catálogos de dados gerenciados para armazenar informações e esquemas relacionados às suas pesquisas nos dados do Amazon S3.
Tipos de dados suportados e integração
A Amazon Athena conta com o mecanismo de consulta SQL de código aberto distribuído pela Presto para permitir análises rápidas ad-hoc e solicitações mais complexas, incluindo funções de janela, grandes junções e agregações . A Athena pode processar tipos de dados estruturados e não estruturados, incluindo formatos como CSV, JSON, ORC, Parquet e Avro. O Athena também suporta dados compactados nos formatos Snappy, Zlib, LZO e GZIP.
O Athena integra-se a outros serviços no portfólio da AWS. Por exemplo, você pode usá-lo com o Amazon QuickSight para visualizar dados ou com o AWS Glue para ativar recursos de catálogo de dados mais sofisticados, como repositório de metadados, esquema automatizado e reconhecimento de partições e pipelines de dados baseados em Python . A própria Athena usa o Amazon S3 como um armazenamento de dados subjacente, que fornece redundância de dados.
Aqui abaixo vou apresentar como fiz para poder usar com PHP o Athenas da AWS
```sql
SELECT * FROM "sampledb"."elb_logs" limit 10;
```
Rotas:
- http://localhost:8000/api/athena/get
```php
Route::prefix('athena')->group(function () {
Route::post('get', [AthenaController::class, 'getData']);
});
```
```
Body:```json
{
"machine": "Case_Erector_Machine"
}
```
Aqui um modelo de exemplo que funcionou perfeito. Usar com Laravel(PHP) o controller
```php
use Illuminate\Http\Request;
use AWS;
class AthenaController extends Controller
{
public function getData(Request $request)
{
$params = $request->all();
try {
$athena = AWS::createClient('athena');
$result1 = $athena->StartQueryExecution(
array(
"QueryExecutionContext" => array("Database" => 'demo_db'),
//"QueryString" => 'SELECT * FROM flow_pkg_machine ORDER BY timestamp DESC limit 50',
"QueryString" => "SELECT * FROM flow_pkg_machine WHERE machine = '" . $params['machine'] . "' ORDER BY timestamp DESC LIMIT 50",
"ResultConfiguration" => array(
"EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"),
"OutputLocation" => 's3://..........',
),
)
);
$QueryExecutionId = $result1->get('QueryExecutionId');
$this->waitForQueryToComplete($QueryExecutionId);
$result1 = $athena->GetQueryResults(array(
'QueryExecutionId' => $QueryExecutionId,
'MaxResults' => 500,
));
$data = $result1->get('ResultSet');
$res = $data['Rows'];
$records = [];
for ($i = 0; $i < count($res); $i++) {
$record = [
"timestamp" => intval($res[$i]['Data'][0]['VarCharValue']),
"machine" => $res[$i]['Data'][1]['VarCharValue'],
"variable" => $res[$i]['Data'][2]['VarCharValue'],
"value" => (double)$res[$i]['Data'][3]['VarCharValue'],
];
array_push($records, $record);
}
array_shift($records);
return response()->json(['data' => $records]);
} catch (AwsException $e) {
error_log($e->getMessage());
}
}
public function waitForQueryToComplete($QueryExecutionId)
{
$athena = AWS::createClient('athena');
while (1) {
$result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId));
$res = $result->toArray();
//echo $res['QueryExecution']['Status']['State'].'
';
if ($res['QueryExecution']['Status']['State'] == 'FAILED') {
echo 'Query Failed';
die;
} else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') {
echo 'Query was cancelled';
die;
} else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') {
break; // break while loop
}
}
}
}
```
Metodo que quais funcionou aqui abaixo:
```php
public function index()
{
$id = 0;
$athena = AWS::createClient('athena');
//\Log::info(json_encode($athena));
$query = 'SELECT * FROM "flow_pkg" limit 10';
//\Log::info(json_encode($query));
$result = $athena->startQueryExecution([
'QueryExecutionContext' => [
'Database' => 'demo_db',
],
'QueryString' => 'SELECT * FROM flow_pkg_machine LIMIT 10', // REQUIRED
'ResultConfiguration' => [ // REQUIRED
'EncryptionConfiguration' => [
'EncryptionOption' => 'SSE_S3', // REQUIRED
],
'OutputLocation' => 's3://teste-demo/demo/tables/', // REQUIRED
],
]);
// check completion : getQueryExecution()
$exId = $result['QueryExecutionId'];
//$id = $result['QueryExecutionId'];
$checkExecution = $athena->getQueryExecution([
'QueryExecutionId' => $exId, // REQUIRED
]);
//dd($checkExecution["QueryExecution"]["ResultConfiguration"]["OutputLocation"]);
//sleep(10);
/*switch ($checkExecution["QueryExecution"]["Status"]["State"]) {
case 'SUCCEEDED':
dump('ok!');
break;
case 'RUNNING':
dump('sleep(30)');
sleep(30);
if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') {
sleep(20);
} elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
sleep(30);
}
break;
case 'QUEUED':
sleep(20);
break;
case 'FAILED':
dump('FAILED');
break;
default:
# code...
sleep(20);
break;
}*/
do {
if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') {
dump('sleep(30)');
sleep(30);
break;
} elseif ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
sleep(30);
if ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
sleep(20);
}
}
} while (0);
if ($checkExecution["QueryExecution"]["Status"]["State"] == 'SUCCEEDED') // 'QUEUED|RUNNING|SUCCEEDED|FAILED|CANCELLED',
{
//dd($result['QueryExecutionId']);
$dataOutput = $athena->getQueryResults([
'QueryExecutionId' => $result['QueryExecutionId'], // REQUIRED
]);
dd($dataOutput);
while (($data = fgetcsv($dataOutput, 1000, ",")) !== false) {
$num = count($data);
//dd($data);
echo "
$num fields in line $row:
\n";
$row++;
for ($c = 0; $c < $num; $c++) {
echo $data[$c] . "
\n";
}
}
}
// elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') {
// sleep(40);
// } else {
// sleep(10);
// }
return response()->json(['status' => $checkExecution["QueryExecution"]["Status"]["State"]]);
}
```
Outro metodo que tambem utilizei, mas não retornou valores que precisava.
```php
use AWS;
class athenaController extends Controller
{
public function getData()
{
try {
$athena = AWS::createClient('athena');
$result1 = $athena->StartQueryExecution(
array(
"QueryExecutionContext" => array("Database" => 'demo_db'),
"QueryString" => 'SELECT * FROM flow_pkg LIMIT 10',
"ResultConfiguration" => array(
"EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"),
"OutputLocation" => 's3://........, // '. $result_logs,
),
)
);
$QueryExecutionId = $result1->get('QueryExecutionId');
\Log::info($QueryExecutionId);
$this->waitForQueryToComplete($QueryExecutionId);
//dd($QueryExecutionId);
$result1 = $athena->GetQueryResults(array(
'QueryExecutionId' => $QueryExecutionId, // REQUIRED
'MaxResults' => 500,
));
\Log::info(json_encode($result1));
$data = $result1->get('ResultSet');
$res = $data['Rows'];
json_encode($data['Rows']);
//dd($res);
return response()->json($data['Rows']);
while (true) {
if ($result1->get('NextToken') == null) {
break;
}
$result1 = $athena->GetQueryResults(array(
'QueryExecutionId' => $QueryExecutionId, // REQUIRED
'NextToken' => $result1->get('NextToken'),
'MaxResults' => 500,
));
$data = $result1->get('ResultSet');
$res = array_merge($res, $data['Rows']);
}
$resData = $this->processResultRows($res);
return $resData;
} catch (\Exception $e) {
// Catch an S3 specific exception.
echo $e->getMessage();
} catch (AwsException $e) {
// This catches the more generic AwsException. You can grab information
// from the exception using methods of the exception object.
echo $e->getAwsRequestId() . "\n";
echo $e->getAwsErrorType() . "\n";
echo $e->getAwsErrorCode() . "\n";
error_log($e->getMessage());
}
}
public function waitForQueryToComplete($QueryExecutionId)
{
$athena = AWS::createClient('athena');
while (1) {
$result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId));
$res = $result->toArray();
//echo $res['QueryExecution']['Status']['State'].'
';
if ($res['QueryExecution']['Status']['State'] == 'FAILED') {
echo 'Query Failed';
die;
} else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') {
echo 'Query was cancelled';
die;
} else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') {
break; // break while loop
}
}
}
/*
* function to process data
*/
public function processResultRows($res)
{
$result = array();
$resul_array = array();
for ($i = 0; $i <= count($res); $i++) {
for ($n = 0; $n < count($res[$i]['Data']); $n++) {
if ($i == 0) {
$result[] = $res[$i]['Data'][$n]['VarCharValue'];
} else {
$resul_array[$i][$result[$n]] = $res[$i]['Data'][$n]['VarCharValue'];
}
}
}
echo 'resul_array_cnt: '.count($resul_array).'
';
return $resul_array;
}
}
```
Fontes onde pesquisei:
- https://amolmatkars.wordpress.com/2017/07/20/how-to-fetch-data-from-aws-athena-using-php/comment-page-1/?unapproved=169&moderation-hash=486731d61ff8ca48b1414df250865850#comment-169
- https://aws.amazon.com/documentation/sdk-for-php/
- https://docs.aws.amazon.com/athena/latest/ug/select.html
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#getqueryresults
- https://www.php.net/manual/pt_BR/function.array-shift.php
- https://titanwolf.org/Network/Articles/Article?AID=d9e0d23a-28a8-4390-8a33-7513fcc45f64#gsc.tab=0
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#startqueryexecution
- https://stackoverflow.com/questions/47568281/return-json-from-athena-query-via-api
- https://stackoverflow.com/questions/55345016/how-to-use-athena-with-laravel/66548012#66548012
- https://packagist.org/packages/alipeng/laravel-athena
- https://github.com/alipeng/laravel-athena
- https://github.com/flemzord/laravel-athena
- https://github.com/Influo/laravel-athena
- https://mugnos-it.com/
# Exemplo que não deu certo
```php
Our database config:
'test_athena' => [
'driver' => 'odbc',
'dsn' => 'odbc:Driver=/opt/simba/athenaodbc/lib/64/libathenaodbc_sb64.so;'
.'AwsRegion=us-east-1;'
.'AuthenticationType=IAM Credentials;'
.'UID=;'
.'PWD=;'
.'S3OutputLocation=s3:///;',
'host' => env('ATHENA_HOST', 'localhost'),
'port' => env('ATHENA_PORT', '5432'),
'database' => env('ATHENA_DATABASE', 'forge'),
'username' => env('ATHENA_USERNAME', 'forge'),
'password' => env('ATHENA_PASSWORD', ''),
'charset' => 'utf8',
'prefix' => '',
'schema' => 'public',
'options' => [
\PDO::ATTR_EMULATE_PREPARES => true,
],
```
```php
$pdo = DB::connection('test_athena')->getPdo();
// this crashes
$pdo->prepare('select * from "qa_lines_csv" where "id" > ? limit 1')->execute([100]);
// this does not crash
$pdo->prepare('select * from "qa_lines_csv" where "id" > 100 limit 1')->execute([]);
// this crashes
DB::connection('test_athena')->table('qa_lines_csv')->where('id', '>', 100)->first()
```
Donate to Site
Renato
Developer