sgykfjsm.github.com

Hosebird Clientを試す

Hosebird ClientはTwitter社製のJava HTTPクライアント
Twitter社が作ってるぐらいだからTwitterとの連携が楽にできるんじゃないかと思って、Scalaでサンプルを試す。

hbcはhbc-coreとhbc-twitter4jの2つで構成されている。
hbc-coreはメッセージ・キューを使って、ナマのストリングメッセージをポーリングすることができる。
hbc-twitter4jはtwitter4jのリスナーを使って、メッセージキューの先頭に解析レイヤーを提供するデータ・モデルを提供している。

とりあえず始める

まずはhbcを取り込むためにbuild.sbtに以下の2行を追加

1
2
3
libraryDependencies += "com.twitter" % "hbc-core" % "1.3.0"

libraryDependencies += "com.twitter" % "hbc-twitter4j" % "1.3.0"

これを追加した後はsbtを起動し、モジュールをダウンロードする。 intellijを使っている場合は、忘れずにgen-ideaをしておくこと。

SampleStreamExampleをScala風に写経してみる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package hbc

import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint
import com.twitter.hbc.httpclient.auth.OAuth1
import com.twitter.hbc.httpclient.BasicClient
import com.twitter.hbc.core.Constants
import com.twitter.hbc.core.processor.StringDelimitedProcessor
import com.twitter.hbc.ClientBuilder
import net.liftweb.json._
import net.liftweb.json.JsonAST.{JString, JNothing}

class SampleStreamExample {
  def hbc_oauth(consumerKey: String, consumerSecret: String,
                token: String, secret: String) {
    val queue = new LinkedBlockingQueue[String](10)
    val endpoint = new StatusesSampleEndpoint()
    // see at https://dev.twitter.com/docs/streaming-apis/parameters#stall_warnings
    endpoint.stallWarnings(false)
    val auth = new OAuth1(consumerKey, consumerSecret, token, secret)

    val client: BasicClient = new ClientBuilder()
      .name("sampleExamplClient")
      .hosts(Constants.STREAM_HOST)
      .endpoint(endpoint)
      .authentication(auth)
      .processor(new StringDelimitedProcessor(queue))
      .build()

    client.connect()

    for (i <- 0 until 10) {
      if (client.isDone)
        println("Client connnection closed unexpectedly: %s".format(client.getExitEvent.getMessage))
      else {
        val msg: String = queue.poll(5, TimeUnit.SECONDS)
        val res = parse(msg)
//        println(pretty(render(res)))
        res match {
          case r if (r \ "delete" == JNothing) => {
            // http://stackoverflow.com/questions/4169153/what-is-the-most-straightforward-way-to-parse-json-in-scala/4169292#4169292
            val JString(created_at) = r \ "created_at"
            val JString(text) = r \ "text"
            val JString(name) = r \ "user" \ "name"
            val JString(screen_name) = r \ "user" \ "screen_name"
            println("%s(%s):\n  %s(%s)".format(screen_name, name, text, created_at))
          }
          case r if (r \ "delete" != JNothing) => println("Sorry.this tweet has deleted.")
          case _ => println("Did not recieve a message in 5 seconds")
        }
      }
    }
    client.stop()
    println("The client read %d messages!".format(client.getStatsTracker.getNumMessages))
  }

}

以下のスクリプトをMainにして、これをキックするようにする。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package hbc


Object Run extends App {
  val consumerKey = "xxxxxxxxxxxxxxxxxxxxxx"
  val consumerSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  val token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  val tokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

  val sse = new SampleStreamExample
  try {
    sse.hbc_oauth(consumerKey, consumerSecret, token, tokenSecret)
  } catch {
    case e: InterruptedException => println(e)
  }

}

なんということでしょう。サンプルのJavaコードそのままです。
Scala力が低すぎる…。個人的にはfor (i <- 0 until 10)のところをどうにかしたい。このままだとあんまりにもアレなので、後半のfor文で特定のフィールドを取り出してみた。
微妙にハマったのは、StatusesSampleEndpointはStreaming APIが垂れ流しているのをそのまま受け取っているだけなので、時々以下のような削除したレスポンスが流れてくる。

1
2
3
4
5
6
7
8
9
10
{
  "delete":{
    "status":{
      "id":318163799519674370,
      "user_id":29877342,
      "id_str":"318163799519674370",
      "user_id_str":"29877342"
    }
  }
}

この場合、case文にあるようなガードをつけていないと、r \ "created_at"などをして値を取り出すときにMatch Errorが発生してしまう。
また、これはHosebirdとは関係ないけど、時々ある特定のフィールドが欠損してしまう場合、以下のようにしておくと良いかもしれない。他の方法があれば教えて欲しい…。

1
val created_at = (r \ "created_at").toOpt.getOrElse(JString("nothing created_at"))

また、case文の最後をアンダースコアでマッチングしているが、サンプルを見るとこの部分に該当するのはif (msg == null)という部分。だからといってcase nullなどとやると以下のようなWarningが出てしまう。

1
2
3
4
5
6
7
8
9
10
11
12
[warn] /Users/sgyk/local/script/scala/scala_example/src/main/scala/hbc/SampleStreamExample.scala:46: match is not exhaustive!
[warn] missing combination         JArray
[warn] missing combination          JBool
[warn] missing combination        JDouble
[warn] missing combination         JField
[warn] missing combination           JInt
[warn] missing combination       JNothing
[warn] missing combination        JObject
[warn] missing combination        JString
[warn]         res match {
[warn]         ^
[warn] one warning found

つまり、マッチングのパターンが足りないよということだけど、だからといってそのまま対応するのはちょっとアレなので、アンダースコアで逃げた。
本当ならこのブログ記事のようにcase classにapplyしたかったけど、そもそもtweetのフィールド数が22個に収まるはずが無いので諦めた。何かうまい方法あるかな?