RdKafkaCaster.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\VarDumper\Caster;
  11. use RdKafka\Conf;
  12. use RdKafka\Exception as RdKafkaException;
  13. use RdKafka\KafkaConsumer;
  14. use RdKafka\Message;
  15. use RdKafka\Metadata\Broker as BrokerMetadata;
  16. use RdKafka\Metadata\Collection as CollectionMetadata;
  17. use RdKafka\Metadata\Partition as PartitionMetadata;
  18. use RdKafka\Metadata\Topic as TopicMetadata;
  19. use RdKafka\Topic;
  20. use RdKafka\TopicConf;
  21. use RdKafka\TopicPartition;
  22. use Symfony\Component\VarDumper\Cloner\Stub;
  23. /**
  24. * Casts RdKafka related classes to array representation.
  25. *
  26. * @author Romain Neutron <imprec@gmail.com>
  27. *
  28. * @internal since Symfony 7.3
  29. */
  30. class RdKafkaCaster
  31. {
  32. public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, bool $isNested): array
  33. {
  34. $prefix = Caster::PREFIX_VIRTUAL;
  35. try {
  36. $assignment = $c->getAssignment();
  37. } catch (RdKafkaException) {
  38. $assignment = [];
  39. }
  40. $a += [
  41. $prefix.'subscription' => $c->getSubscription(),
  42. $prefix.'assignment' => $assignment,
  43. ];
  44. $a += self::extractMetadata($c);
  45. return $a;
  46. }
  47. public static function castTopic(Topic $c, array $a, Stub $stub, bool $isNested): array
  48. {
  49. $prefix = Caster::PREFIX_VIRTUAL;
  50. $a += [
  51. $prefix.'name' => $c->getName(),
  52. ];
  53. return $a;
  54. }
  55. public static function castTopicPartition(TopicPartition $c, array $a): array
  56. {
  57. $prefix = Caster::PREFIX_VIRTUAL;
  58. $a += [
  59. $prefix.'offset' => $c->getOffset(),
  60. $prefix.'partition' => $c->getPartition(),
  61. $prefix.'topic' => $c->getTopic(),
  62. ];
  63. return $a;
  64. }
  65. public static function castMessage(Message $c, array $a, Stub $stub, bool $isNested): array
  66. {
  67. $prefix = Caster::PREFIX_VIRTUAL;
  68. $a += [
  69. $prefix.'errstr' => $c->errstr(),
  70. ];
  71. return $a;
  72. }
  73. public static function castConf(Conf $c, array $a, Stub $stub, bool $isNested): array
  74. {
  75. $prefix = Caster::PREFIX_VIRTUAL;
  76. foreach ($c->dump() as $key => $value) {
  77. $a[$prefix.$key] = $value;
  78. }
  79. return $a;
  80. }
  81. public static function castTopicConf(TopicConf $c, array $a, Stub $stub, bool $isNested): array
  82. {
  83. $prefix = Caster::PREFIX_VIRTUAL;
  84. foreach ($c->dump() as $key => $value) {
  85. $a[$prefix.$key] = $value;
  86. }
  87. return $a;
  88. }
  89. public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, bool $isNested): array
  90. {
  91. $prefix = Caster::PREFIX_VIRTUAL;
  92. $a += [
  93. $prefix.'out_q_len' => $c->getOutQLen(),
  94. ];
  95. $a += self::extractMetadata($c);
  96. return $a;
  97. }
  98. public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, bool $isNested): array
  99. {
  100. $a += iterator_to_array($c);
  101. return $a;
  102. }
  103. public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, bool $isNested): array
  104. {
  105. $prefix = Caster::PREFIX_VIRTUAL;
  106. $a += [
  107. $prefix.'name' => $c->getTopic(),
  108. $prefix.'partitions' => $c->getPartitions(),
  109. ];
  110. return $a;
  111. }
  112. public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, bool $isNested): array
  113. {
  114. $prefix = Caster::PREFIX_VIRTUAL;
  115. $a += [
  116. $prefix.'id' => $c->getId(),
  117. $prefix.'err' => $c->getErr(),
  118. $prefix.'leader' => $c->getLeader(),
  119. ];
  120. return $a;
  121. }
  122. public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, bool $isNested): array
  123. {
  124. $prefix = Caster::PREFIX_VIRTUAL;
  125. $a += [
  126. $prefix.'id' => $c->getId(),
  127. $prefix.'host' => $c->getHost(),
  128. $prefix.'port' => $c->getPort(),
  129. ];
  130. return $a;
  131. }
  132. private static function extractMetadata(KafkaConsumer|\RdKafka $c): array
  133. {
  134. $prefix = Caster::PREFIX_VIRTUAL;
  135. try {
  136. $m = $c->getMetadata(true, null, 500);
  137. } catch (RdKafkaException) {
  138. return [];
  139. }
  140. return [
  141. $prefix.'orig_broker_id' => $m->getOrigBrokerId(),
  142. $prefix.'orig_broker_name' => $m->getOrigBrokerName(),
  143. $prefix.'brokers' => $m->getBrokers(),
  144. $prefix.'topics' => $m->getTopics(),
  145. ];
  146. }
  147. }