| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import cv2
- import pytesseract
- import time
- from datetime import datetime
- from pytesseract import Output
- import cv2
- import pytesseract
- from pytesseract import Output
- import numpy as np
- import re
- import os
- from influxdb import InfluxDBClient
- client = InfluxDBClient(host="influxdb", port=8086, username="influxdb", password="influxdbTSGAMES")
- client.create_database("influxdb")
- client.switch_database('influxdb')
- class SolarMonitor:
- camera = None
- def __init__(self, test = False):
- self.test = test
- if not test:
- self.initCamera()
-
- def parse(self, img):
- custom_oem=r'--oem 3 --psm 11'
- # https://github.com/adrianlazaro8/Tesseract_sevenSegmentsLetsGoDigital
- data = pytesseract.image_to_data(img, lang='lets', config=custom_oem, output_type=Output.DICT)
- print(data)
- results = []
- for i in range(len(data['text'])):
- text = data['text'][i].strip('.,-_')
- text = re.sub('[^0-9]', '', text)
- if text:
- results.append(text)
- if len(results) == 2:
- results = list(map(lambda x: int(x) / 10.,results ))
- if results[0] < 100 and results[1] < 1000:
- return results
- return None
- def thresholding(self, image):
- image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
-
- pixel_values = image.reshape((-1, 1))
- pixel_values = np.float32(pixel_values)
- print(pixel_values.shape)
- # define stopping criteria
- criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
- # number of clusters (K)
- k = 3
- _, labels, (centers) = cv2.kmeans(pixel_values, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
- # convert back to 8 bit values
- centers = np.uint8(centers)
- # flatten the labels array
- labels = labels.flatten()
- # convert all pixels to the color of the centroids
- segmented_image = centers[labels.flatten()]
- segmented_image = segmented_image.reshape(image.shape)
-
- # disable only the cluster number 2 (turn the pixel into black)
- masked_image = np.copy(image)
- # convert to the shape of a vector of pixel values
- masked_image = masked_image.reshape((-1, 1))
- # color (i.e cluster) to disable
- cluster = 2
- masked_image[labels != cluster] = [0]
- # convert back to original shape
- masked_image = masked_image.reshape(image.shape)
-
- return masked_image
-
-
- image = cv2.medianBlur(image,3)
- #image = np.invert(image)
- return cv2.threshold(image, 140, 255, cv2.THRESH_BINARY)[1]
- return image
- def thresholding2(self, image):
-
- image = cv2.medianBlur(image,7)
- #image = np.invert(image)
- return image
-
- def crop(self, image):
- top=314
- left=230
- height= 150
- width=400
- return image[top : (top + height) , left: (left + width)]
- def initCamera(self):
- from picamera import PiCamera
- self.camera = PiCamera(
- resolution=(800, 600)
- )
- self.camera.awb_mode = 'off'
- self.camera.awb_gains = (1.5, 2.0)
- self.camera.shutter_speed = 60000
- self.camera.iso = 800
- def capture(self):
- file = "/home/pi/solar-monitor/camera/" + str(datetime.now()) + ".jpg"
- self.camera.capture(file)
- print("Captured.")
- img = cv2.imread(file)
- img = self.crop(img)
- return [file, img]
- def writeData(self, results):
- client.write_points([
- {
- "measurement": "solar",
- "tags": {
- "type": "U"
- },
- "fields": {
- "value": results[0]
- }
- },
- {
- "measurement": "solar",
- "tags": {
- "type": "W"
- },
- "fields": {
- "value": results[1]
- }
- }
- ], time_precision='ms')
-
- def run(self):
- if self.test:
- for file in os.listdir('tests'):
- img = cv2.imread('tests/' + file)
- img = self.thresholding2(img)
- results = self.parse(img)
- print(file)
- print(results)
- if results:
- self.writeData(results)
- cv2.imwrite(file + '_r.jpg', img)
- else:
- [file, img] = solar.capture()
- img = self.thresholding2(img)
- results = self.parse(img)
- print(results)
- if results:
- self.writeData(results)
- cv2.imwrite(file + '_r.jpg', img)
- # os.unlink(file)
- time.sleep(2)
- while True:
- solar = SolarMonitor(test = True)
- solar.run()
-
|